pkg/cli/job_actions.go (772 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cli
import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"strings"
"time"
"github.com/uber/peloton/.gen/peloton/api/v0/job"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
"github.com/uber/peloton/.gen/peloton/api/v0/query"
"github.com/uber/peloton/.gen/peloton/api/v0/task"
"github.com/uber/peloton/pkg/common/stringset"
"github.com/uber/peloton/pkg/common/util"
jobmgrtask "github.com/uber/peloton/pkg/jobmgr/task"
"github.com/golang/protobuf/ptypes"
"go.uber.org/multierr"
"go.uber.org/yarpc/yarpcerrors"
pb "gopkg.in/cheggaaa/pb.v1"
yaml "gopkg.in/yaml.v2"
)
const (
labelSeparator = ","
keyValSeparator = "="
defaultResponseFormat = "yaml"
jsonResponseFormat = "json"
jobStopProgressTimeout = 10 * time.Minute
jobStopProgressRefresh = 5 * time.Second
jobSummaryFormatHeader = "ID\tName\tOwner\tState\tCreation Time\tCompletion Time\tTotal\t" +
"Running\tSucceeded\tFailed\tKilled\t\n"
jobSummaryFormatBody = "%s\t%s\t%s\t%s\t%s\t%s\t%d\t%d\t%d\t%d\t%d\t\n"
jobStopConfirmationMessage = "The above jobs will be stopped. " +
"Are you sure you want to continue?"
)
// JobCreateAction is the action for creating a job
func (c *Client) JobCreateAction(
jobID, respoolPath, cfg, secretPath string, secret []byte,
) error {
respoolID, err := c.LookupResourcePoolID(respoolPath)
if err != nil {
return err
}
if respoolID == nil {
return fmt.Errorf("unable to find resource pool ID for "+
":%s", respoolPath)
}
var jobConfig job.JobConfig
buffer, err := ioutil.ReadFile(cfg)
if err != nil {
return fmt.Errorf("unable to open file %s: %v", cfg, err)
}
if err := yaml.Unmarshal(buffer, &jobConfig); err != nil {
return fmt.Errorf("unable to parse file %s: %v", cfg, err)
}
// TODO remove this once respool is moved out of jobconfig
// set the resource pool ID
jobConfig.RespoolID = respoolID
var request = &job.CreateRequest{
Id: &peloton.JobID{
Value: jobID,
},
Config: &jobConfig,
}
// handle secrets
if secretPath != "" && len(secret) > 0 {
request.Secrets = []*peloton.Secret{
jobmgrtask.CreateSecretProto("", secretPath, secret)}
}
response, err := c.jobClient.Create(c.ctx, request)
if err != nil {
return err
}
printJobCreateResponse(response, c.Debug)
return nil
}
// JobDeleteAction is the action for deleting a job
func (c *Client) JobDeleteAction(jobID string) error {
var request = &job.DeleteRequest{
Id: &peloton.JobID{
Value: jobID,
},
}
response, err := c.jobClient.Delete(c.ctx, request)
if err != nil {
return err
}
printResponseJSON(response)
return nil
}
// JobGetAction is the action for getting a job
func (c *Client) JobGetAction(jobID string) error {
response, err := c.jobGet(jobID)
if err != nil {
return err
}
printJobGetResponse(response, c.Debug)
return nil
}
func (c *Client) jobGet(jobID string) (*job.GetResponse, error) {
var request = &job.GetRequest{
Id: &peloton.JobID{
Value: jobID,
},
}
return c.jobClient.Get(c.ctx, request)
}
// JobGetCacheAction is the action for getting a job cache
func (c *Client) JobGetCacheAction(jobID string) error {
r, err := c.jobClient.GetCache(c.ctx, &job.GetCacheRequest{
Id: &peloton.JobID{Value: jobID},
})
if err != nil {
return err
}
printResponseJSON(r)
tabWriter.Flush()
return nil
}
// JobGetActiveJobsAction is the action for getting active jobs list
func (c *Client) JobGetActiveJobsAction() error {
r, err := c.jobClient.GetActiveJobs(c.ctx, &job.GetActiveJobsRequest{})
if err != nil {
return err
}
printResponseJSON(r)
tabWriter.Flush()
return nil
}
// JobRefreshAction calls the refresh API for a job
func (c *Client) JobRefreshAction(jobID string) error {
var request = &job.RefreshRequest{
Id: &peloton.JobID{
Value: jobID,
},
}
_, err := c.jobClient.Refresh(c.ctx, request)
return err
}
// JobStatusAction is the action for getting status of a job
func (c *Client) JobStatusAction(jobID string) error {
var request = &job.GetRequest{
Id: &peloton.JobID{
Value: jobID,
},
}
response, err := c.jobClient.Get(c.ctx, request)
if err != nil {
return err
}
printJobStatusResponse(response, c.Debug)
return nil
}
// JobQueryAction is the action for getting job ids by labels,
// respool path, keywords, state(s), owner and jobname
func (c *Client) JobQueryAction(
labels string,
respoolPath string,
keywords string,
states string,
owner string,
name string,
days uint32,
limit uint32,
maxLimit uint32,
offset uint32,
sortBy string,
sortOrder string) error {
var apiLabels []*peloton.Label
var err error
if len(labels) > 0 {
apiLabels, err = parsePelotonLabels(labels)
if err != nil {
return err
}
}
var respoolID *peloton.ResourcePoolID
if len(respoolPath) > 0 {
respoolID, err = c.LookupResourcePoolID(respoolPath)
if err != nil {
return err
}
}
var apiKeywords []string
for _, k := range strings.Split(keywords, labelSeparator) {
if k != "" {
apiKeywords = append(apiKeywords, k)
}
}
var apiStates []job.JobState
for _, k := range strings.Split(states, labelSeparator) {
if k != "" {
apiStates = append(apiStates, job.JobState(job.JobState_value[k]))
}
}
order := query.OrderBy_DESC
if sortOrder == "ASC" {
order = query.OrderBy_ASC
} else if sortOrder != "DESC" {
return errors.New("Invalid sort order " + sortOrder)
}
var sort []*query.OrderBy
for _, s := range strings.Split(sortBy, labelSeparator) {
if s != "" {
propertyPath := &query.PropertyPath{
Value: s,
}
sort = append(sort, &query.OrderBy{
Order: order,
Property: propertyPath,
})
}
}
spec := &job.QuerySpec{
Labels: apiLabels,
Keywords: apiKeywords,
JobStates: apiStates,
Owner: owner,
Name: name,
Pagination: &query.PaginationSpec{
Limit: limit,
Offset: offset,
OrderBy: sort,
MaxLimit: maxLimit,
},
}
if days > 0 {
now := time.Now().UTC()
max, err := ptypes.TimestampProto(now)
if err != nil {
return err
}
min, err := ptypes.TimestampProto(now.AddDate(0, 0, -int(days)))
if err != nil {
return err
}
spec.CreationTimeRange = &peloton.TimeRange{Min: min, Max: max}
}
var request = &job.QueryRequest{
RespoolID: respoolID,
Spec: spec,
SummaryOnly: true,
}
response, err := c.jobClient.Query(c.ctx, request)
if err != nil {
return err
}
printJobQueryResponse(response, c.Debug)
return nil
}
// JobUpdateAction is the action of updating a job
func (c *Client) JobUpdateAction(
jobID, cfg, secretPath string, secret []byte) error {
var jobConfig job.JobConfig
buffer, err := ioutil.ReadFile(cfg)
if err != nil {
return fmt.Errorf("unable to open file %s: %v", cfg, err)
}
if err := yaml.Unmarshal(buffer, &jobConfig); err != nil {
return fmt.Errorf("unable to parse file %s: %v", cfg, err)
}
var request = &job.UpdateRequest{
Id: &peloton.JobID{
Value: jobID,
},
Config: &jobConfig,
}
// handle secrets
if secretPath != "" && len(secret) > 0 {
request.Secrets = []*peloton.Secret{
jobmgrtask.CreateSecretProto("", secretPath, secret)}
}
response, err := c.jobClient.Update(c.ctx, request)
if err != nil {
return err
}
printJobUpdateResponse(response, c.Debug)
return nil
}
// JobStopAction is the action of stopping job(s) by jobID,
// owner and labels
func (c *Client) JobStopAction(
jobID string,
showProgress bool,
owner string,
labels string,
isForceStop bool,
jobStopLimit uint32,
jobStopMaxLimit uint32,
) error {
var (
errs error
jobsToStop []*peloton.JobID
jobStopLabels []*peloton.Label
stopJobIDs []string
stopFailedJobIDs []string
)
jobStopLabelSet := stringset.New()
if len(labels) > 0 {
var err error
jobStopLabels, err = parsePelotonLabels(labels)
if err != nil {
return err
}
for _, label := range jobStopLabels {
jobStopLabelSet.Add(label.GetKey() + ":" + label.GetValue())
}
}
if jobID == "" && owner == "" {
fmt.Printf("Either jobID or owner needs to be provided.\n")
tabWriter.Flush()
return nil
}
if jobID != "" {
// check the job status
jobGetResponse, err := c.jobGet(jobID)
if err != nil {
return err
}
jobConfig := jobGetResponse.GetJobInfo().GetConfig()
// If the job doesn't satisfy owner or label constraints, return
if owner != "" && jobConfig.GetOwningTeam() != owner {
fmt.Printf("No matching job found\n")
tabWriter.Flush()
return nil
}
if len(labels) > 0 {
jobHasStopLabels := false
for _, label := range jobConfig.Labels {
if jobStopLabelSet.Contains(strings.TrimSpace(label.GetKey()) +
":" + strings.TrimSpace(label.GetValue())) {
jobHasStopLabels = true
break
}
}
if !jobHasStopLabels {
fmt.Fprintf(
tabWriter,
"No matching job found\n",
)
tabWriter.Flush()
return nil
}
}
if util.IsPelotonJobStateTerminal(
jobGetResponse.GetJobInfo().GetRuntime().GetState()) {
fmt.Fprintf(
tabWriter,
"Job is in terminal state: %s\n", jobGetResponse.GetJobInfo().
GetRuntime().GetState().String(),
)
tabWriter.Flush()
return nil
}
jobsToStop = append(jobsToStop, &peloton.JobID{Value: jobID})
} else {
jobStates := []job.JobState{
job.JobState_INITIALIZED,
job.JobState_PENDING,
job.JobState_RUNNING,
}
spec := &job.QuerySpec{
JobStates: jobStates,
Owner: owner,
Labels: jobStopLabels,
Pagination: &query.PaginationSpec{
Limit: jobStopLimit,
MaxLimit: jobStopMaxLimit,
},
}
request := &job.QueryRequest{
Spec: spec,
SummaryOnly: true,
}
response, err := c.jobClient.Query(c.ctx, request)
if err != nil {
return err
}
for _, jobSummary := range response.GetResults() {
printResponseJSON(jobSummary)
jobsToStop = append(jobsToStop, jobSummary.GetId())
}
if !isForceStop && !askForConfirmation(jobStopConfirmationMessage) {
return nil
}
}
if len(jobsToStop) == 0 {
fmt.Fprintf(tabWriter, "No matching job(s) found\n")
tabWriter.Flush()
return nil
}
for _, jobID := range jobsToStop {
request := &task.StopRequest{
JobId: jobID,
}
response, err := c.taskClient.Stop(c.ctx, request)
if err != nil {
stopFailedJobIDs = append(stopFailedJobIDs,
jobID.GetValue())
errs = multierr.Append(errs, err)
} else {
stopJobIDs = append(stopJobIDs, jobID.GetValue())
}
if showProgress {
continue
}
printTaskStopResponse(response, c.Debug)
// Retry one more time in case failedInstanceList is non zero
if len(response.GetInvalidInstanceIds()) > 0 {
fmt.Fprint(
tabWriter,
"Retrying failed tasks",
)
response, err = c.taskClient.Stop(c.ctx, request)
if err != nil {
errs = multierr.Append(errs, err)
continue
}
printTaskStopResponse(response, c.Debug)
}
}
if showProgress {
for _, jobID := range jobsToStop {
if err := c.pollStatusWithTimeout(jobID); err != nil {
errs = multierr.Append(errs, err)
}
}
}
fmt.Fprintf(tabWriter, "Stopping jobs: %v\n", stopJobIDs)
if len(stopFailedJobIDs) != 0 {
fmt.Fprintf(tabWriter, "Error stopping jobs: %v\n", stopFailedJobIDs)
}
tabWriter.Flush()
return errs
}
// JobRestartAction is the action for restarting a job
func (c *Client) JobRestartAction(
jobID string,
resourceVersion uint64,
instanceRanges []*task.InstanceRange,
batchSize uint32,
) error {
var response *job.RestartResponse
var err error
id := &peloton.JobID{
Value: jobID,
}
err = c.retryUntilConcurrencyControlSucceeds(
id,
resourceVersion,
func(resourceVersionParam uint64) error {
request := &job.RestartRequest{
Id: id,
Ranges: instanceRanges,
ResourceVersion: resourceVersionParam,
RestartConfig: &job.RestartConfig{
BatchSize: batchSize,
},
}
response, err = c.jobClient.Restart(c.ctx, request)
return err
},
)
if err != nil {
return err
}
printResponseJSON(response)
return nil
}
// JobStartAction is the action for starting a job
func (c *Client) JobStartAction(
jobID string,
resourceVersion uint64,
instanceRanges []*task.InstanceRange,
batchSize uint32,
) error {
var response *job.StartResponse
var err error
id := &peloton.JobID{
Value: jobID,
}
err = c.retryUntilConcurrencyControlSucceeds(
id,
resourceVersion,
func(resourceVersionParam uint64) error {
request := &job.StartRequest{
Id: id,
Ranges: instanceRanges,
ResourceVersion: resourceVersionParam,
StartConfig: &job.StartConfig{
BatchSize: batchSize,
},
}
response, err = c.jobClient.Start(c.ctx, request)
return err
},
)
if err != nil {
return err
}
printResponseJSON(response)
return nil
}
// JobStopV1BetaAction is the action for stopping a job using new job API
func (c *Client) JobStopV1BetaAction(
jobID string,
resourceVersion uint64,
instanceRanges []*task.InstanceRange,
batchSize uint32,
) error {
var response *job.StopResponse
var err error
id := &peloton.JobID{
Value: jobID,
}
err = c.retryUntilConcurrencyControlSucceeds(
id,
resourceVersion,
func(resourceVersionParam uint64) error {
request := &job.StopRequest{
Id: id,
Ranges: instanceRanges,
ResourceVersion: resourceVersionParam,
StopConfig: &job.StopConfig{
BatchSize: batchSize,
},
}
response, err = c.jobClient.Stop(c.ctx, request)
return err
},
)
if err != nil {
return err
}
printResponseJSON(response)
return nil
}
func (c *Client) retryUntilConcurrencyControlSucceeds(
id *peloton.JobID,
resourceVersion uint64,
fn func(
resourceVersion uint64,
) error,
) error {
for {
// first fetch the job runtime
var jobGetRequest = &job.GetRequest{
Id: id,
}
jobGetResponse, err := c.jobClient.Get(c.ctx, jobGetRequest)
if err != nil {
return err
}
jobRuntime := jobGetResponse.GetJobInfo().GetRuntime()
if jobRuntime == nil {
return fmt.Errorf("unable to find the job to restart")
}
if resourceVersion > 0 {
if jobRuntime.GetConfigurationVersion() != resourceVersion {
return fmt.Errorf(
"invalid input resource version current %v provided %v",
jobRuntime.GetConfigurationVersion(), resourceVersion)
}
}
err = fn(jobRuntime.GetConfigurationVersion())
if err != nil {
if yarpcerrors.IsInvalidArgument(err) &&
yarpcerrors.FromError(err).Message() == invalidVersionError &&
resourceVersion == 0 {
continue
}
return err
}
break
}
return nil
}
func (c *Client) pollStatusWithTimeout(id *peloton.JobID) error {
// get the status
total, terminated, err := c.tasksTerminated(id)
if err != nil {
return err
}
if total == terminated {
// done
fmt.Printf("Job %s stopped, total_tasks:%d terminated_tasks:%d\n",
id.GetValue(), total, terminated)
return nil
}
// init the bar to the total tasks
bar := pb.Simple.
Start(int(total)).
SetTotal(int64(total)).
SetWidth(150).
SetRefreshRate(time.Second).
Set("prefix", fmt.Sprintf("Job %s terminated/total: ", id.GetValue()))
defer bar.Finish()
// Keep trying until we're timed out or got a result or got an error
timeout := time.After(jobStopProgressTimeout)
refresh := time.Tick(jobStopProgressRefresh)
for {
select {
case <-timeout:
fmt.Fprint(os.Stderr, "Timed out waiting for job to stop")
return nil
case <-refresh:
total, terminated, err := c.tasksTerminated(id)
if err != nil {
return err
}
bar.SetCurrent(int64(terminated))
if total == terminated {
// done
return nil
}
}
}
}
// returns the total number of tasks, terminated tasks and error if any
func (c *Client) tasksTerminated(id *peloton.JobID) (uint32, uint32, error) {
var request = &job.GetRequest{
Id: id,
}
ctx, cf := context.WithTimeout(context.Background(), 5*time.Second)
defer cf()
response, err := c.jobClient.Get(ctx, request)
if err != nil {
return 0, 0, err
}
total := response.GetJobInfo().GetConfig().GetInstanceCount()
// check job state
state := response.GetJobInfo().GetRuntime().GetState()
if util.IsPelotonJobStateTerminal(state) {
// the job is in terminal state
return total, total, nil
}
tasksInStates := response.GetJobInfo().GetRuntime().GetTaskStats()
// all terminal task states
terminated := uint32(0)
for state, count := range tasksInStates {
if util.IsPelotonStateTerminal(task.TaskState(task.
TaskState_value[state])) {
terminated += count
}
}
return total, terminated, nil
}
func printJobUpdateResponse(r *job.UpdateResponse, jsonFormat bool) {
if jsonFormat {
printResponseJSON(r)
} else {
if r.Error != nil {
if r.Error.JobNotFound != nil {
fmt.Fprintf(tabWriter, "Job %s not found: %s\n",
r.Error.JobNotFound.Id.Value, r.Error.JobNotFound.Message)
} else if r.Error.InvalidConfig != nil {
fmt.Fprintf(tabWriter, "Invalid job config: %s\n",
r.Error.InvalidConfig.Message)
}
} else if r.Id != nil {
fmt.Fprintf(tabWriter, "Job %s updated\n", r.Id.Value)
fmt.Fprint(tabWriter, "Message:", r.Message)
}
}
}
func printJobCreateResponse(r *job.CreateResponse, jsonFormat bool) {
if jsonFormat {
printResponseJSON(r)
} else {
if r.Error != nil {
if r.Error.AlreadyExists != nil {
fmt.Fprintf(tabWriter, "Job %s already exists: %s\n",
r.Error.AlreadyExists.Id.Value, r.Error.AlreadyExists.Message)
} else if r.Error.InvalidConfig != nil {
fmt.Fprintf(tabWriter, "Invalid job config: %s\n",
r.Error.InvalidConfig.Message)
} else if r.Error.InvalidJobId != nil {
fmt.Fprintf(tabWriter, "Invalid job ID: %v, message: %v\n",
r.Error.InvalidJobId.Id.Value,
r.Error.InvalidJobId.Message)
}
} else if r.JobId != nil {
fmt.Fprintf(tabWriter, "Job %s created\n", r.JobId.Value)
} else {
fmt.Fprint(tabWriter, "Missing job ID in job create response\n")
}
tabWriter.Flush()
}
}
func printJobGetResponse(r *job.GetResponse, jsonFormat bool) {
if r.GetJobInfo() == nil {
fmt.Fprint(tabWriter, "Unable to get job \n")
} else {
format := defaultResponseFormat
if jsonFormat {
format = jsonResponseFormat
}
out, err := marshallResponse(format, r)
if err != nil {
fmt.Fprint(tabWriter, "Unable to marshall response \n")
}
fmt.Printf("%v\n", string(out))
}
tabWriter.Flush()
}
func printJobStatusResponse(r *job.GetResponse, jsonFormat bool) {
if r.GetJobInfo() == nil || r.GetJobInfo().GetRuntime() == nil {
fmt.Fprint(tabWriter, "Unable to get job status\n")
} else {
ri := r.GetJobInfo().GetRuntime()
format := defaultResponseFormat
if jsonFormat {
format = jsonResponseFormat
}
out, err := marshallResponse(format, ri)
if err != nil {
fmt.Fprint(tabWriter, "Unable to marshall response\n")
return
}
fmt.Printf("%v\n", string(out))
}
tabWriter.Flush()
}
func printJobQueryResult(j *job.JobSummary) {
creationTime, err := time.Parse(time.RFC3339Nano, j.GetRuntime().GetCreationTime())
creationTimeStr := ""
if err == nil {
creationTimeStr = creationTime.Format(time.RFC3339)
}
completionTime, err := time.Parse(time.RFC3339Nano, j.GetRuntime().GetCompletionTime())
completionTimeStr := ""
if err == nil {
completionTimeStr = completionTime.Format(time.RFC3339)
} else {
// completionTime will be empty for active jobs
completionTimeStr = "--"
}
fmt.Fprintf(
tabWriter,
jobSummaryFormatBody,
j.GetId().GetValue(),
j.GetName(),
j.GetOwningTeam(),
j.GetRuntime().GetState().String(),
creationTimeStr,
completionTimeStr,
j.GetInstanceCount(),
j.GetRuntime().GetTaskStats()["RUNNING"],
j.GetRuntime().GetTaskStats()["SUCCEEDED"],
j.GetRuntime().GetTaskStats()["FAILED"],
j.GetRuntime().GetTaskStats()["KILLED"],
)
}
func printJobQueryResponse(r *job.QueryResponse, jsonFormat bool) {
if jsonFormat {
printResponseJSON(r)
} else {
if r.GetError() != nil {
fmt.Fprintf(tabWriter, "Error: %v\n", r.GetError().String())
} else {
results := r.GetResults()
if len(results) != 0 {
fmt.Fprint(tabWriter, jobSummaryFormatHeader)
for _, k := range results {
printJobQueryResult(k)
}
} else {
fmt.Fprint(tabWriter, "No jobs found.\n", r.GetError().String())
}
}
tabWriter.Flush()
}
}
func parsePelotonLabels(labels string) ([]*peloton.Label, error) {
var pelotonLabels []*peloton.Label
for _, l := range strings.Split(labels, labelSeparator) {
labelVals := strings.Split(l, keyValSeparator)
if len(labelVals) != 2 {
fmt.Printf("Invalid label %v", l)
return nil, errors.New("Invalid label" + l)
}
pelotonLabels = append(pelotonLabels, &peloton.Label{
Key: labelVals[0],
Value: labelVals[1],
})
}
return pelotonLabels, nil
}
// askForConfirmation uses Scanln to parse user input. A user must type in "yes" or "no" and
// then press enter. It has fuzzy matching, so "y", "Y", "yes", "YES", and "Yes" all count as
// confirmations. If the input is not recognized, it will ask again. The function does not return
// until it gets a valid response from the user.
func askForConfirmation(s string) bool {
for {
fmt.Printf("%s [y/n]: ", s)
var response string
_, err := fmt.Scanln(&response)
if err != nil {
return false
}
response = strings.ToLower(strings.TrimSpace(response))
if response == "y" || response == "yes" {
return true
}
if response == "n" || response == "no" {
return false
}
}
}