pkg/cli/stateless_actions.go (970 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package cli import ( "errors" "fmt" "io" "io/ioutil" "strings" "time" "github.com/uber/peloton/.gen/peloton/api/v0/task" "github.com/uber/peloton/.gen/peloton/api/v1alpha/job/stateless" statelesssvc "github.com/uber/peloton/.gen/peloton/api/v1alpha/job/stateless/svc" v1alphapeloton "github.com/uber/peloton/.gen/peloton/api/v1alpha/peloton" v1alphapod "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod" v1alphaquery "github.com/uber/peloton/.gen/peloton/api/v1alpha/query" v1alpharespool "github.com/uber/peloton/.gen/peloton/api/v1alpha/respool" "github.com/uber/peloton/.gen/peloton/private/jobmgrsvc" jobmgrtask "github.com/uber/peloton/pkg/jobmgr/task" "github.com/golang/protobuf/ptypes" "go.uber.org/yarpc/yarpcerrors" yaml "gopkg.in/yaml.v2" ) const ( statelessJobSummaryFormatHeader = "ID\tName\tOwner\tState\tCreation Time\tTotal\t" + "Running\tSucceeded\tFailed\tKilled\t" + "Workflow Status\tCompleted\tFailed\tCurrent\n" statelessSummaryFormatBody = "%s\t%s\t%s\t%s\t%s\t%d\t%d\t%d\t%d\t%d\t%s\t%d\t%d\t%d\t\n" statelessUpdateEventsFormatHeader = "Type\tTimestamp\tState\t\n" statelessUpdateEventsFormatBody = "%s\t%s\t%s\t\n" workflowEventsV1AlphaFormatHeader = "Workflow State\tWorkflow Type\tTimestamp\n" workflowEventsV1AlphaFormatBody = "%s\t%s\t%s\n" queryPodsFormatHeader = "Pod ID\tName\tState\tContainer Name\tContainer State\tHealthy\tStart Time\tRun Time\t" + "Host\tMessage\tReason\tTermination Status\t\n" queryPodsFormatBody = "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t\n" podListFormatHeader = "Name\tPod ID\tState\tHealthy\tStart Time\t" + "Host\tMessage\tReason\t\n" podListFormatBody = "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t\n" ) // StatelessGetCacheAction get cache of stateless job func (c *Client) StatelessGetCacheAction(jobID string) error { resp, err := c.jobmgrClient.GetJobCache( c.ctx, &jobmgrsvc.GetJobCacheRequest{ JobId: &v1alphapeloton.JobID{Value: jobID}, }) if err != nil { return err } out, err := marshallResponse(defaultResponseFormat, resp) if err != nil { return err } fmt.Printf("%v\n", string(out)) return nil } // StatelessRefreshAction refreshes a job func (c *Client) StatelessRefreshAction(jobID string) error { resp, err := c.jobmgrClient.RefreshJob( c.ctx, &jobmgrsvc.RefreshJobRequest{ JobId: &v1alphapeloton.JobID{Value: jobID}, }) if err != nil { return err } out, err := marshallResponse(defaultResponseFormat, resp) if err != nil { return err } fmt.Printf("%v\n", string(out)) return nil } // StatelessWorkflowPauseAction pauses a workflow func (c *Client) StatelessWorkflowPauseAction( jobID string, entityVersion string, opaqueData string, ) error { var opaque *v1alphapeloton.OpaqueData if len(opaqueData) > 0 { opaque = &v1alphapeloton.OpaqueData{Data: opaqueData} } resp, err := c.statelessClient.PauseJobWorkflow( c.ctx, &statelesssvc.PauseJobWorkflowRequest{ JobId: &v1alphapeloton.JobID{Value: jobID}, Version: &v1alphapeloton.EntityVersion{Value: entityVersion}, OpaqueData: opaque, }, ) if err != nil { return err } fmt.Printf("Workflow paused. New EntityVersion: %s\n", resp.GetVersion().GetValue()) return nil } // StatelessWorkflowResumeAction resumes a workflow func (c *Client) StatelessWorkflowResumeAction( jobID string, entityVersion string, opaqueData string, ) error { var opaque *v1alphapeloton.OpaqueData if len(opaqueData) > 0 { opaque = &v1alphapeloton.OpaqueData{Data: opaqueData} } resp, err := c.statelessClient.ResumeJobWorkflow( c.ctx, &statelesssvc.ResumeJobWorkflowRequest{ JobId: &v1alphapeloton.JobID{Value: jobID}, Version: &v1alphapeloton.EntityVersion{Value: entityVersion}, OpaqueData: opaque, }, ) if err != nil { return err } fmt.Printf("Workflow resumed. New EntityVersion: %s\n", resp.GetVersion().GetValue()) return nil } // StatelessWorkflowAbortAction aborts a workflow func (c *Client) StatelessWorkflowAbortAction( jobID string, entityVersion string, opaqueData string, ) error { var opaque *v1alphapeloton.OpaqueData if len(opaqueData) > 0 { opaque = &v1alphapeloton.OpaqueData{Data: opaqueData} } resp, err := c.statelessClient.AbortJobWorkflow( c.ctx, &statelesssvc.AbortJobWorkflowRequest{ JobId: &v1alphapeloton.JobID{Value: jobID}, Version: &v1alphapeloton.EntityVersion{Value: entityVersion}, OpaqueData: opaque, }, ) if err != nil { return err } fmt.Printf("Workflow aborted. New EntityVersion: %s\n", resp.GetVersion().GetValue()) return nil } // StatelessStopJobAction stops a job func (c *Client) StatelessStopJobAction(jobID string, entityVersion string) error { resp, err := c.statelessClient.StopJob( c.ctx, &statelesssvc.StopJobRequest{ JobId: &v1alphapeloton.JobID{Value: jobID}, Version: &v1alphapeloton.EntityVersion{Value: entityVersion}, }, ) if err != nil { return err } fmt.Printf("Job stopped. New EntityVersion: %s\n", resp.GetVersion().GetValue()) return nil } // StatelessQueryAction queries a job given the spec func (c *Client) StatelessQueryAction( labels string, respoolPath string, keywords string, states string, owner string, name string, days uint32, limit uint32, maxLimit uint32, offset uint32, sortBy string, sortOrder string) error { pelotonLabels, err := parseLabels(labels) if err != nil { return err } orderBy, err := parseOrderBy(sortBy, sortOrder) if err != nil { return err } spec := &stateless.QuerySpec{ Pagination: &v1alphaquery.PaginationSpec{ Offset: offset, Limit: limit, OrderBy: orderBy, MaxLimit: maxLimit, }, Labels: pelotonLabels, Keywords: parseKeyWords(keywords), JobStates: parseJobStates(states), Owner: owner, Name: name, } if len(respoolPath) > 0 { spec.Respool = &v1alpharespool.ResourcePoolPath{ Value: respoolPath, } } if days > 0 { now := time.Now().UTC() max, err := ptypes.TimestampProto(now) if err != nil { return err } min, err := ptypes.TimestampProto(now.AddDate(0, 0, -int(days))) if err != nil { return err } spec.CreationTimeRange = &v1alphapeloton.TimeRange{Min: min, Max: max} } resp, err := c.statelessClient.QueryJobs(c.ctx, &statelesssvc.QueryJobsRequest{ Spec: spec, }) if err != nil { return err } printStatelessQueryResponse(resp) tabWriter.Flush() return nil } // StatelessReplaceJobAction updates job by replace its config func (c *Client) StatelessReplaceJobAction( jobID string, spec string, batchSize uint32, respoolPath string, entityVersion string, override bool, maxInstanceRetries uint32, maxTolerableInstanceFailures uint32, rollbackOnFailure bool, startPaused bool, opaqueData string, inPlace bool, startPods bool, ) error { var jobSpec stateless.JobSpec // read the job configuration buffer, err := ioutil.ReadFile(spec) if err != nil { return fmt.Errorf("unable to open file %s: %v", spec, err) } if err := yaml.Unmarshal(buffer, &jobSpec); err != nil { return fmt.Errorf("unable to parse file %s: %v", spec, err) } // fetch the resource pool id respoolID, err := c.LookupResourcePoolID(respoolPath) if err != nil { return err } if respoolID == nil { return fmt.Errorf("unable to find resource pool ID for "+ ":%s", respoolPath) } // Get the entity version if not provided as input. // The assumption is that the customer has ensured that // there is no other ongoing write API request going on. // In case the entityversion changes between Get and Replace, // then let the Replace request fail. if len(entityVersion) == 0 { getRequest := statelesssvc.GetJobRequest{ JobId: &v1alphapeloton.JobID{ Value: jobID, }, SummaryOnly: true, } getResponse, err := c.statelessClient.GetJob( c.ctx, &getRequest) if err != nil { return err } entityVersion = getResponse.GetSummary().GetStatus().GetVersion().GetValue() } // set the resource pool id jobSpec.RespoolId = &v1alphapeloton.ResourcePoolID{Value: respoolID.GetValue()} var opaque *v1alphapeloton.OpaqueData if len(opaqueData) > 0 { opaque = &v1alphapeloton.OpaqueData{Data: opaqueData} } req := &statelesssvc.ReplaceJobRequest{ JobId: &v1alphapeloton.JobID{Value: jobID}, Version: &v1alphapeloton.EntityVersion{Value: entityVersion}, Spec: &jobSpec, UpdateSpec: &stateless.UpdateSpec{ BatchSize: batchSize, RollbackOnFailure: rollbackOnFailure, MaxInstanceRetries: maxInstanceRetries, MaxTolerableInstanceFailures: maxTolerableInstanceFailures, StartPaused: startPaused, InPlace: inPlace, StartPods: startPods, }, OpaqueData: opaque, } resp, err := c.statelessClient.ReplaceJob(c.ctx, req) if err != nil { return err } fmt.Printf("New EntityVersion: %s\n", resp.GetVersion().GetValue()) return nil } // StatelessRollbackJobAction roll backs a job to an older entity version by // getting the old configuration, and triggering a replace action // with the old configiuration. func (c *Client) StatelessRollbackJobAction( jobID string, batchSize uint32, entityVersion string, maxInstanceRetries uint32, maxTolerableInstanceFailures uint32, startPaused bool, opaqueData string, inPlace bool, startPods bool, ) error { // First fetch the previous job configuration. getConfigRequest := statelesssvc.GetJobRequest{ JobId: &v1alphapeloton.JobID{ Value: jobID, }, Version: &v1alphapeloton.EntityVersion{ Value: entityVersion, }, } getConfigResponse, err := c.statelessClient.GetJob( c.ctx, &getConfigRequest, ) if err != nil { return err } jobSpec := getConfigResponse.GetJobInfo().GetSpec() // Get the latest entity version. getRequest := statelesssvc.GetJobRequest{ JobId: &v1alphapeloton.JobID{ Value: jobID, }, SummaryOnly: true, } getResponse, err := c.statelessClient.GetJob( c.ctx, &getRequest, ) if err != nil { return err } version := getResponse.GetSummary().GetStatus().GetVersion().GetValue() jobSpec.Revision = nil var opaque *v1alphapeloton.OpaqueData if len(opaqueData) > 0 { opaque = &v1alphapeloton.OpaqueData{Data: opaqueData} } req := &statelesssvc.ReplaceJobRequest{ JobId: &v1alphapeloton.JobID{Value: jobID}, Version: &v1alphapeloton.EntityVersion{Value: version}, Spec: jobSpec, UpdateSpec: &stateless.UpdateSpec{ BatchSize: batchSize, RollbackOnFailure: false, MaxInstanceRetries: maxInstanceRetries, MaxTolerableInstanceFailures: maxTolerableInstanceFailures, StartPaused: startPaused, InPlace: inPlace, StartPods: startPods, }, OpaqueData: opaque, } resp, err := c.statelessClient.ReplaceJob(c.ctx, req) if err != nil { return err } fmt.Printf("New EntityVersion: %s\n", resp.GetVersion().GetValue()) return nil } // StatelessListJobsAction prints summary of all jobs using the ListJobs API func (c *Client) StatelessListJobsAction() error { defer tabWriter.Flush() stream, err := c.statelessClient.ListJobs( c.ctx, &statelesssvc.ListJobsRequest{}, ) if err != nil { return err } fmt.Fprint(tabWriter, statelessJobSummaryFormatHeader) for { resp, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } printListJobsResponse(resp) } } // StatelessReplaceJobDiffAction returns the set of instances which will be // added, removed, updated and remain unchanged for a new // job specification for a given job func (c *Client) StatelessReplaceJobDiffAction( jobID string, spec string, entityVersion string, respoolPath string, ) error { var jobSpec stateless.JobSpec // read the job configuration buffer, err := ioutil.ReadFile(spec) if err != nil { return fmt.Errorf("unable to open file %s: %v", spec, err) } if err := yaml.Unmarshal(buffer, &jobSpec); err != nil { return fmt.Errorf("unable to parse file %s: %v", spec, err) } // fetch the resource pool id respoolID, err := c.LookupResourcePoolID(respoolPath) if err != nil { return err } if respoolID == nil { return fmt.Errorf("unable to find resource pool ID for "+ ":%s", respoolPath) } // set the resource pool id jobSpec.RespoolId = &v1alphapeloton.ResourcePoolID{Value: respoolID.GetValue()} req := &statelesssvc.GetReplaceJobDiffRequest{ JobId: &v1alphapeloton.JobID{Value: jobID}, Spec: &jobSpec, Version: &v1alphapeloton.EntityVersion{Value: entityVersion}, } resp, err := c.statelessClient.GetReplaceJobDiff(c.ctx, req) if err != nil { return err } printResponseJSON(resp) return nil } // StatelessCreateAction is the action for creating a stateless job func (c *Client) StatelessCreateAction( jobID string, respoolPath string, batchSize uint32, cfg string, secretPath string, secret []byte, opaque string, startPaused bool, maxInstanceRetries uint32, maxTolerableInstanceFailures uint32, ) error { respoolID, err := c.LookupResourcePoolID(respoolPath) if err != nil { return err } if respoolID == nil { return fmt.Errorf("unable to find resource pool ID for "+ ":%s", respoolPath) } var jobSpec stateless.JobSpec buffer, err := ioutil.ReadFile(cfg) if err != nil { return fmt.Errorf("unable to open file %s: %v", cfg, err) } if err := yaml.Unmarshal(buffer, &jobSpec); err != nil { return fmt.Errorf("unable to parse file %s: %v", cfg, err) } jobSpec.RespoolId = &v1alphapeloton.ResourcePoolID{Value: respoolID.GetValue()} var opaqueData *v1alphapeloton.OpaqueData if len(opaque) != 0 { opaqueData = &v1alphapeloton.OpaqueData{ Data: opaque, } } var request = &statelesssvc.CreateJobRequest{ JobId: &v1alphapeloton.JobID{ Value: jobID, }, Spec: &jobSpec, OpaqueData: opaqueData, CreateSpec: &stateless.CreateSpec{ BatchSize: batchSize, MaxInstanceRetries: maxInstanceRetries, MaxTolerableInstanceFailures: maxTolerableInstanceFailures, StartPaused: startPaused, }, } // handle secrets if secretPath != "" && len(secret) > 0 { request.Secrets = []*v1alphapeloton.Secret{ jobmgrtask.CreateV1AlphaSecretProto("", secretPath, secret), } } response, err := c.statelessClient.CreateJob(c.ctx, request) printStatelessJobCreateResponse(request, response, err, c.Debug) return err } // StatelessGetAction is the action for getting status // and spec (or only summary) of a stateless job func (c *Client) StatelessGetAction( jobID string, version string, summaryOnly bool, ) error { request := statelesssvc.GetJobRequest{ JobId: &v1alphapeloton.JobID{ Value: jobID, }, SummaryOnly: summaryOnly, } if version != "" { request.Version = &v1alphapeloton.EntityVersion{ Value: version, } } resp, err := c.statelessClient.GetJob( c.ctx, &request) if err != nil { return err } out, err := marshallResponse(defaultResponseFormat, resp) if err != nil { return err } fmt.Printf("%v\n", string(out)) return nil } // StatelessQueryPodsAction is the action for querying pods of a job func (c *Client) StatelessQueryPodsAction( jobID string, states string, names string, hosts string, limit uint32, offset uint32, sortBy string, sortOrder string, ) error { var podStates []v1alphapod.PodState var podNames []*v1alphapeloton.PodName var podHosts []string for _, k := range strings.Split(states, labelSeparator) { if k != "" { p, ok := v1alphapod.PodState_value[k] if !ok { return yarpcerrors.InvalidArgumentErrorf("invalid pod state %s", k) } podStates = append(podStates, v1alphapod.PodState(p)) } } for _, host := range strings.Split(hosts, labelSeparator) { if host != "" { podHosts = append(podHosts, host) } } for _, name := range strings.Split(names, labelSeparator) { if name != "" { podNames = append(podNames, &v1alphapeloton.PodName{Value: name}) } } order := v1alphaquery.OrderBy_ORDER_BY_DESC if sortOrder == "ASC" { order = v1alphaquery.OrderBy_ORDER_BY_ASC } else if sortOrder != "DESC" { return yarpcerrors.InvalidArgumentErrorf("Invalid sort order " + sortOrder) } var sort []*v1alphaquery.OrderBy for _, s := range strings.Split(sortBy, labelSeparator) { if s != "" { propertyPath := &v1alphaquery.PropertyPath{ Value: s, } sort = append(sort, &v1alphaquery.OrderBy{ Order: order, Property: propertyPath, }) } } var request = &statelesssvc.QueryPodsRequest{ JobId: &v1alphapeloton.JobID{ Value: jobID, }, Spec: &v1alphapod.QuerySpec{ PodStates: podStates, Names: podNames, Hosts: podHosts, Pagination: &v1alphaquery.PaginationSpec{ Limit: limit, Offset: offset, OrderBy: sort, }, }, Pagination: &v1alphaquery.PaginationSpec{ Limit: limit, Offset: offset, OrderBy: sort, }, } response, err := c.statelessClient.QueryPods(c.ctx, request) printQueryPodsResponse(response, err, c.Debug) return err } // StatelessListPodsAction is the action to list pods in a job func (c *Client) StatelessListPodsAction( jobID string, instanceRange *task.InstanceRange, ) error { defer tabWriter.Flush() idInstanceRange := &v1alphapod.InstanceIDRange{ From: instanceRange.GetFrom(), To: instanceRange.GetTo(), } stream, err := c.statelessClient.ListPods( c.ctx, &statelesssvc.ListPodsRequest{ JobId: &v1alphapeloton.JobID{Value: jobID}, Range: idInstanceRange, }, ) if err != nil { return err } fmt.Fprint(tabWriter, queryPodsFormatHeader) for { resp, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } printStatelessListPodsResponse(resp) } } func printStatelessListPodsResponse(resp *statelesssvc.ListPodsResponse) { for _, pod := range resp.GetPods() { printPod(pod.GetStatus(), pod.GetPodName()) } } // StatelessStartJobAction is the action for starting a stateless job func (c *Client) StatelessStartJobAction( jobID string, entityVersion string, ) error { request := statelesssvc.StartJobRequest{ JobId: &v1alphapeloton.JobID{ Value: jobID, }, Version: &v1alphapeloton.EntityVersion{ Value: entityVersion, }, } resp, err := c.statelessClient.StartJob( c.ctx, &request) if err != nil { return err } fmt.Printf("Job started. New EntityVersion: %s\n", resp.GetVersion().GetValue()) return nil } // StatelessDeleteAction is the action for deleting a stateless job func (c *Client) StatelessDeleteAction( jobID string, version string, forceDelete bool, ) error { _, err := c.statelessClient.DeleteJob( c.ctx, &statelesssvc.DeleteJobRequest{ JobId: &v1alphapeloton.JobID{Value: jobID}, Version: &v1alphapeloton.EntityVersion{Value: version}, Force: forceDelete, }, ) if err != nil { return err } fmt.Printf("Job deleted\n") return nil } func printStatelessQueryResponse(resp *statelesssvc.QueryJobsResponse) { results := resp.GetRecords() if len(results) == 0 { fmt.Fprintf(tabWriter, "No results found\n") return } fmt.Fprint(tabWriter, statelessJobSummaryFormatHeader) for _, r := range results { printStatelessQueryResult(r) } } func printStatelessQueryResult(j *stateless.JobSummary) { creationTime, err := time.Parse(time.RFC3339Nano, j.GetStatus().GetCreationTime()) creationTimeStr := "" if err == nil { creationTimeStr = creationTime.Format(time.RFC3339) } fmt.Fprintf( tabWriter, statelessSummaryFormatBody, j.GetJobId().GetValue(), j.GetName(), j.GetOwningTeam(), j.GetStatus().GetState().String(), creationTimeStr, j.GetInstanceCount(), j.GetStatus().GetPodStats()["POD_STATE_RUNNING"], j.GetStatus().GetPodStats()["POD_STATE_SUCCEEDED"], j.GetStatus().GetPodStats()["POD_STATE_FAILED"], j.GetStatus().GetPodStats()["POD_STATE_KILLED"], j.GetStatus().GetWorkflowStatus().GetState().String(), j.GetStatus().GetWorkflowStatus().GetNumInstancesCompleted(), j.GetStatus().GetWorkflowStatus().GetNumInstancesFailed(), len(j.GetStatus().GetWorkflowStatus().GetInstancesCurrent()), ) } func parseLabels(labels string) ([]*v1alphapeloton.Label, error) { var pelotonLabels []*v1alphapeloton.Label if len(labels) == 0 { return nil, nil } for _, l := range strings.Split(labels, labelSeparator) { labelVals := strings.Split(l, keyValSeparator) if len(labelVals) != 2 { return nil, errors.New("Invalid label" + l) } pelotonLabels = append(pelotonLabels, &v1alphapeloton.Label{ Key: labelVals[0], Value: labelVals[1], }) } return pelotonLabels, nil } func parseKeyWords(keywords string) []string { if len(keywords) == 0 { return nil } var pelotonKeyWords []string for _, k := range strings.Split(keywords, labelSeparator) { if k != "" { pelotonKeyWords = append(pelotonKeyWords, k) } } return pelotonKeyWords } func parseJobStates(states string) []stateless.JobState { if len(states) == 0 { return nil } var pelotonStates []stateless.JobState for _, k := range strings.Split(states, labelSeparator) { if k != "" { pelotonStates = append(pelotonStates, stateless.JobState(stateless.JobState_value[k])) } } return pelotonStates } func parseOrderBy(sortBy string, sortOrder string) ([]*v1alphaquery.OrderBy, error) { if len(sortBy) == 0 || len(sortOrder) == 0 { return nil, nil } order := v1alphaquery.OrderBy_ORDER_BY_DESC if sortOrder == "ASC" { order = v1alphaquery.OrderBy_ORDER_BY_ASC } else if sortOrder != "DESC" { return nil, errors.New("Invalid sort order " + sortOrder) } var sort []*v1alphaquery.OrderBy for _, s := range strings.Split(sortBy, labelSeparator) { if s != "" { propertyPath := &v1alphaquery.PropertyPath{ Value: s, } sort = append(sort, &v1alphaquery.OrderBy{ Order: order, Property: propertyPath, }) } } return sort, nil } func printStatelessJobCreateResponse( req *statelesssvc.CreateJobRequest, resp *statelesssvc.CreateJobResponse, err error, jsonFormat bool, ) { defer tabWriter.Flush() if jsonFormat { printResponseJSON(resp) } else { if err != nil { if yarpcerrors.IsAlreadyExists(err) { fmt.Fprintf(tabWriter, "Job %s already exists: %s\n", req.GetJobId(), err.Error()) } else if yarpcerrors.IsInvalidArgument(err) { fmt.Fprintf(tabWriter, "Invalid job spec: %s\n", err.Error()) } } else if resp.GetJobId() != nil { fmt.Fprintf( tabWriter, "Job %s created. Entity Version: %s\n", resp.GetJobId().GetValue(), resp.GetVersion().GetValue(), ) } else { fmt.Fprint(tabWriter, "Missing job ID in job create response\n") } } } func printListJobsResponse(resp *statelesssvc.ListJobsResponse) { jobs := resp.GetJobs() for _, r := range jobs { printStatelessQueryResult(r) } } // StatelessRestartJobAction restarts a job func (c *Client) StatelessRestartJobAction( jobID string, batchSize uint32, entityVersion string, instanceRanges []*task.InstanceRange, opaqueData string, inPlace bool, ) error { var opaque *v1alphapeloton.OpaqueData if len(opaqueData) > 0 { opaque = &v1alphapeloton.OpaqueData{Data: opaqueData} } var idInstanceRanges []*v1alphapod.InstanceIDRange for _, instanceRange := range instanceRanges { idInstanceRanges = append(idInstanceRanges, &v1alphapod.InstanceIDRange{ From: instanceRange.GetFrom(), To: instanceRange.GetTo(), }) } req := &statelesssvc.RestartJobRequest{ JobId: &v1alphapeloton.JobID{Value: jobID}, Version: &v1alphapeloton.EntityVersion{Value: entityVersion}, RestartSpec: &stateless.RestartSpec{ BatchSize: batchSize, Ranges: idInstanceRanges, InPlace: inPlace, }, OpaqueData: opaque, } resp, err := c.statelessClient.RestartJob(c.ctx, req) if err != nil { return err } fmt.Printf("Job restarted. New EntityVersion: %s\n", resp.GetVersion().GetValue()) return nil } // StatelessListUpdatesAction lists updates of a job func (c *Client) StatelessListUpdatesAction( jobID string, updatesLimit uint32, ) error { resp, err := c.statelessClient.ListJobWorkflows( c.ctx, &statelesssvc.ListJobWorkflowsRequest{ JobId: &v1alphapeloton.JobID{Value: jobID}, UpdatesLimit: updatesLimit, }, ) if err != nil { return err } return printListUpdatesResponse(resp, c.Debug) } func printListUpdatesResponse( resp *statelesssvc.ListJobWorkflowsResponse, debug bool) error { updates := resp.GetWorkflowInfos() if debug { printResponseJSON(resp) return nil } if len(updates) == 0 { fmt.Println("No update for job") } else { for i, u := range updates { fmt.Printf("Index %d:\n", i) if err := printUpdateInfo(u); err != nil { return err } fmt.Printf("\n\n\n") } } return nil } func printUpdateInfo(workflowInfo *stateless.WorkflowInfo) error { out, err := marshallResponse(defaultResponseFormat, workflowInfo) if err != nil { return err } fmt.Printf("%v\n", string(out)) if len(workflowInfo.GetEvents()) != 0 { fmt.Fprint(tabWriter, statelessUpdateEventsFormatHeader) for _, event := range workflowInfo.GetEvents() { fmt.Fprintf( tabWriter, statelessUpdateEventsFormatBody, event.GetType().String(), event.GetTimestamp(), event.GetState().String(), ) } tabWriter.Flush() } return nil } // StatelessWorkflowEventsAction gets most recent active or // completed workflow events for a job func (c *Client) StatelessWorkflowEventsAction( jobID string, instanceID uint32, ) error { resp, err := c.statelessClient.GetWorkflowEvents( c.ctx, &statelesssvc.GetWorkflowEventsRequest{ JobId: &v1alphapeloton.JobID{Value: jobID}, InstanceId: instanceID, }) if err != nil { return err } printWorkflowEventsV1AlphaResponse(resp) return nil } func printWorkflowEventsV1AlphaResponse(r *statelesssvc.GetWorkflowEventsResponse) { defer tabWriter.Flush() fmt.Fprint(tabWriter, workflowEventsV1AlphaFormatHeader) for _, event := range r.GetEvents() { fmt.Fprintf( tabWriter, workflowEventsV1AlphaFormatBody, event.GetState(), event.GetType(), event.GetTimestamp(), ) } } func printQueryPodsResponse(r *statelesssvc.QueryPodsResponse, err error, debug bool) { defer tabWriter.Flush() if debug { printResponseJSON(r) return } if yarpcerrors.IsNotFound(err) { fmt.Fprintf(tabWriter, "Job was not found\n") return } if len(r.GetPods()) == 0 { fmt.Fprint(tabWriter, "No pods found\n") return } fmt.Fprint(tabWriter, queryPodsFormatHeader) for _, t := range r.GetPods() { printPod(t.GetStatus(), t.GetSpec().GetPodName()) } } func printPod(status *v1alphapod.PodStatus, podName *v1alphapeloton.PodName) { for _, container := range status.GetContainersStatus() { // Calculate the start time and run time of the container startTimeStr := "" durationStr := "" startTime, err := time.Parse(time.RFC3339Nano, container.GetStartTime()) if err == nil { startTimeStr = startTime.Format(time.RFC3339) completionTime, err := time.Parse(time.RFC3339Nano, container.GetCompletionTime()) var duration time.Duration if err == nil { duration = completionTime.Sub(startTime) } else { duration = time.Now().Sub(startTime) } durationStr = fmt.Sprintf( "%02d:%02d:%02d", uint(duration.Hours()), uint(duration.Minutes())%60, uint(duration.Seconds())%60, ) } termStatusStr := "" termStatus := container.GetTerminationStatus() if termStatus != nil { termStatusStr = termStatus.GetReason().String() termStatusStr = strings.TrimPrefix(termStatusStr, "TERMINATION_STATUS_REASON_") } // Print the container record fmt.Fprintf( tabWriter, queryPodsFormatBody, status.GetPodId().GetValue(), podName.GetValue(), status.GetState().String(), container.GetName(), container.GetState().String(), container.GetHealthy().GetState().String(), startTimeStr, durationStr, status.GetHost(), container.GetMessage(), container.GetReason(), termStatusStr, ) } }