pkg/cli/task_actions.go (533 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cli
import (
"errors"
"fmt"
"io/ioutil"
"net/http"
"sort"
"strings"
"time"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
"github.com/uber/peloton/.gen/peloton/api/v0/query"
"github.com/uber/peloton/.gen/peloton/api/v0/task"
)
const (
taskListFormatHeader = "Instance\tName\tState\tHealthy\tStart Time\tRun Time\t" +
"Host\tMessage\tReason\tTermination Status\t\n"
taskListFormatBody = "%d\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t\n"
podEventsFormatHeader = "Mesos Task Id\tDesired Mesos Task Id\tActual State\tGoal State\tConfig Version\tDesired Config Version\tHealthy\tHost\tMessage\tReason\tUpdate Time\t\n"
podEventsFormatBody = "%s\t%s\t%s\t%s\t%d\t%d\t%s\t%s\t%s\t%s\t%s\t\n"
)
// sortedTaskInfoList makes TaskInfo implement sortable interface
type sortedTaskInfoList []*task.TaskInfo
func (a sortedTaskInfoList) Len() int { return len(a) }
func (a sortedTaskInfoList) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a sortedTaskInfoList) Less(i, j int) bool { return a[i].InstanceId < a[j].InstanceId }
// TaskGetAction is the action to get a task instance
func (c *Client) TaskGetAction(jobID string, instanceID uint32) error {
var request = &task.GetRequest{
JobId: &peloton.JobID{
Value: jobID,
},
InstanceId: instanceID,
}
response, err := c.taskClient.Get(c.ctx, request)
if err != nil {
return err
}
printTaskGetResponse(response, c.Debug)
return nil
}
// TaskGetCacheAction is the acion to get a task cache
func (c *Client) TaskGetCacheAction(jobID string, instanceID uint32) error {
requst := &task.GetCacheRequest{
JobId: &peloton.JobID{
Value: jobID,
},
InstanceId: instanceID,
}
response, err := c.taskClient.GetCache(c.ctx, requst)
if err != nil {
return err
}
printResponseJSON(response)
tabWriter.Flush()
return nil
}
// TaskLogsGetAction is the action to get logs files for given job instance.
func (c *Client) TaskLogsGetAction(fileName string, jobID string, instanceID uint32, taskID string) error {
var request = &task.BrowseSandboxRequest{
JobId: &peloton.JobID{
Value: jobID,
},
InstanceId: instanceID,
TaskId: taskID,
}
response, err := c.taskClient.BrowseSandbox(c.ctx, request)
if err != nil {
return err
}
if response.GetError() != nil {
return errors.New(response.Error.String())
}
var filePath string
for _, path := range response.GetPaths() {
if strings.HasSuffix(path, fileName) {
filePath = path
}
}
if len(filePath) == 0 {
return fmt.Errorf(
"filename:%s not found in sandbox files: %s",
fileName,
response.GetPaths())
}
logFileDownloadURL := fmt.Sprintf(
"http://%s:%s/files/download?path=%s",
response.GetHostname(),
response.GetPort(),
filePath)
resp, err := http.Get(logFileDownloadURL)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
fmt.Printf("\n\n%s", body)
return nil
}
// TaskGetEventsAction is the action to get a task instance
func (c *Client) TaskGetEventsAction(jobID string, instanceID uint32) error {
var request = &task.GetPodEventsRequest{
JobId: &peloton.JobID{
Value: jobID,
},
InstanceId: instanceID,
}
response, err := c.taskClient.GetPodEvents(c.ctx, request)
if err != nil {
return err
}
fmt.Fprintf(tabWriter,
"** Task Events CLI is deprecated, please use Pod Events CLI **\n")
printPodGetEventsResponse(response, c.Debug)
return nil
}
// PodGetEventsAction returns pod events in reverse chronological order.
func (c *Client) PodGetEventsAction(
jobID string,
instanceID uint32,
runID string,
limit uint64) error {
var request = &task.GetPodEventsRequest{
JobId: &peloton.JobID{
Value: jobID,
},
InstanceId: instanceID,
RunId: runID,
Limit: limit,
}
response, err := c.taskClient.GetPodEvents(c.ctx, request)
if err != nil {
return err
}
printPodGetEventsResponse(response, c.Debug)
return nil
}
// TaskListAction is the action to list tasks
func (c *Client) TaskListAction(jobID string, instanceRange *task.InstanceRange) error {
var request = &task.ListRequest{
JobId: &peloton.JobID{
Value: jobID,
},
Range: instanceRange,
}
response, err := c.taskClient.List(c.ctx, request)
if err != nil {
return err
}
printTaskListResponse(response, c.Debug)
return nil
}
// TaskQueryAction is the action to query task
func (c *Client) TaskQueryAction(
jobID string,
states string,
names string,
hosts string,
limit uint32,
offset uint32,
sortBy string,
sortOrder string) error {
var taskStates []task.TaskState
var taskNames, taskHosts []string
for _, k := range strings.Split(states, labelSeparator) {
if k != "" {
taskStates = append(taskStates, task.TaskState(task.TaskState_value[k]))
}
}
for _, host := range strings.Split(hosts, labelSeparator) {
if host != "" {
taskHosts = append(taskHosts, host)
}
}
for _, name := range strings.Split(names, labelSeparator) {
if name != "" {
taskNames = append(taskNames, name)
}
}
order := query.OrderBy_DESC
if sortOrder == "ASC" {
order = query.OrderBy_ASC
} else if sortOrder != "DESC" {
return errors.New("Invalid sort order " + sortOrder)
}
var sort []*query.OrderBy
for _, s := range strings.Split(sortBy, labelSeparator) {
if s != "" {
propertyPath := &query.PropertyPath{
Value: s,
}
sort = append(sort, &query.OrderBy{
Order: order,
Property: propertyPath,
})
}
}
var request = &task.QueryRequest{
JobId: &peloton.JobID{
Value: jobID,
},
Spec: &task.QuerySpec{
TaskStates: taskStates,
Names: taskNames,
Hosts: taskHosts,
Pagination: &query.PaginationSpec{
Limit: limit,
Offset: offset,
OrderBy: sort,
},
},
}
response, err := c.taskClient.Query(c.ctx, request)
if err != nil {
return err
}
printTaskQueryResponse(response, c.Debug)
return nil
}
// TaskRefreshAction calls task refresh API
func (c *Client) TaskRefreshAction(jobID string, instanceRange *task.InstanceRange) error {
var request = &task.RefreshRequest{
JobId: &peloton.JobID{
Value: jobID,
},
Range: instanceRange,
}
_, err := c.taskClient.Refresh(c.ctx, request)
return err
}
// TaskStartAction is the action to start a task
func (c *Client) TaskStartAction(jobID string, instanceRanges []*task.InstanceRange) error {
var request = &task.StartRequest{
JobId: &peloton.JobID{
Value: jobID,
},
Ranges: instanceRanges,
}
response, err := c.taskClient.Start(c.ctx, request)
if err != nil {
return err
}
printTaskStartResponse(response, c.Debug)
return nil
}
// TaskStopAction is the action to stop a task
func (c *Client) TaskStopAction(jobID string,
instanceRanges []*task.InstanceRange) error {
id := &peloton.JobID{
Value: jobID,
}
var request = &task.StopRequest{
JobId: id,
Ranges: instanceRanges,
}
response, err := c.taskClient.Stop(c.ctx, request)
if err != nil {
return err
}
printTaskStopResponse(response, c.Debug)
// Retry one more time in case failedInstanceList is non zero
if len(response.GetInvalidInstanceIds()) > 0 {
fmt.Fprint(
tabWriter,
"Retrying failed tasks\n",
)
response, err = c.taskClient.Stop(c.ctx, request)
if err != nil {
return err
}
printTaskStopResponse(response, c.Debug)
}
return nil
}
// TaskRestartAction is the action to restart a task
func (c *Client) TaskRestartAction(jobID string, instanceRanges []*task.InstanceRange) error {
var request = &task.RestartRequest{
JobId: &peloton.JobID{
Value: jobID,
},
Ranges: instanceRanges,
}
response, err := c.taskClient.Restart(c.ctx, request)
if err != nil {
return err
}
printTaskRestartResponse(response, c.Debug)
return nil
}
// printTask print the single row output of the task
func printTask(t *task.TaskInfo) {
cfg := t.GetConfig()
runtime := t.GetRuntime()
// Calculate the start time and run time of the task
startTimeStr := ""
durationStr := ""
startTime, err := time.Parse(time.RFC3339Nano, runtime.GetStartTime())
if err == nil {
startTimeStr = startTime.Format(time.RFC3339)
completionTime, err := time.Parse(time.RFC3339Nano, runtime.GetCompletionTime())
var duration time.Duration
if err == nil {
duration = completionTime.Sub(startTime)
} else {
duration = time.Now().Sub(startTime)
}
durationStr = fmt.Sprintf(
"%02d:%02d:%02d",
uint(duration.Hours()),
uint(duration.Minutes())%60,
uint(duration.Seconds())%60,
)
}
termStatusStr := ""
termStatus := runtime.GetTerminationStatus()
if termStatus != nil {
termStatusStr = termStatus.GetReason().String()
termStatusStr = strings.TrimPrefix(termStatusStr, "TERMINATION_STATUS_REASON_")
}
// Print the task record
fmt.Fprintf(
tabWriter,
taskListFormatBody,
t.GetInstanceId(),
cfg.GetName(),
runtime.GetState().String(),
runtime.GetHealthy().String(),
startTimeStr,
durationStr,
runtime.GetHost(),
runtime.GetMessage(),
runtime.GetReason(),
termStatusStr,
)
}
func printTaskGetResponse(r *task.GetResponse, debug bool) {
defer tabWriter.Flush()
if debug {
printResponseJSON(r)
return
}
if r.GetNotFound() != nil {
fmt.Fprintf(tabWriter, "Job %s was not found: %s\n",
r.NotFound.Id.Value, r.NotFound.Message)
return
}
if r.GetOutOfRange() != nil {
fmt.Fprintf(tabWriter,
"Requested instance of job %s is not within the range of valid "+
"instances (0...%d)\n",
r.OutOfRange.JobId.Value, r.OutOfRange.InstanceCount)
return
}
if r.GetResult() != nil {
fmt.Fprint(tabWriter, taskListFormatHeader)
printTask(r.GetResult())
return
}
fmt.Fprint(tabWriter, "Unexpected error, no results in response.\n")
}
func printPodGetEventsResponse(r *task.GetPodEventsResponse, debug bool) {
defer tabWriter.Flush()
if debug {
printResponseJSON(r)
return
}
err := r.GetError()
if err != nil {
fmt.Fprintf(tabWriter,
"Got event error: %s\n", err.GetMessage())
return
}
fmt.Fprint(tabWriter, podEventsFormatHeader)
for _, event := range r.GetResult() {
fmt.Fprintf(
tabWriter,
podEventsFormatBody,
event.GetTaskId().GetValue(),
event.GetDesriedTaskId().GetValue(),
event.GetActualState(),
event.GetGoalState(),
event.GetConfigVersion(),
event.GetDesiredConfigVersion(),
event.GetHealthy(),
event.GetHostname(),
event.GetMessage(),
event.GetReason(),
event.GetTimestamp(),
)
}
}
func printTaskListResponse(r *task.ListResponse, debug bool) {
defer tabWriter.Flush()
if debug {
printResponseJSON(r)
return
}
if r.GetNotFound() != nil {
fmt.Fprintf(tabWriter, "Job %s was not found: %s\n",
r.NotFound.Id.Value, r.NotFound.Message)
return
}
fmt.Fprint(tabWriter, taskListFormatHeader)
// we want to show tasks in sorted order
tasks := make(sortedTaskInfoList, len(r.GetResult().GetValue()))
i := 0
for _, k := range r.GetResult().GetValue() {
tasks[i] = k
i++
}
sort.Sort(tasks)
for _, t := range tasks {
printTask(t)
}
}
func printTaskQueryResponse(r *task.QueryResponse, debug bool) {
defer tabWriter.Flush()
if debug {
printResponseJSON(r)
return
}
if r.GetError().GetNotFound() != nil {
fmt.Fprintf(tabWriter, "Job %s was not found: %s\n",
r.Error.NotFound.Id.Value, r.Error.NotFound.Message)
return
}
if len(r.GetRecords()) == 0 {
fmt.Fprint(tabWriter, "No tasks found\n")
return
}
fmt.Fprint(tabWriter, taskListFormatHeader)
for _, t := range r.GetRecords() {
printTask(t)
}
}
func printTaskStartResponse(r *task.StartResponse, debug bool) {
defer tabWriter.Flush()
if debug {
printResponseJSON(r)
return
}
respError := r.GetError()
if respError == nil {
fmt.Fprintf(
tabWriter,
"Tasks started successfully for instances: %v and "+
"failed instances are: %v\n",
r.GetStartedInstanceIds(),
r.GetInvalidInstanceIds(),
)
return
}
if respError.GetNotFound() != nil {
fmt.Fprintf(
tabWriter,
"Job %s was not found: %s\n",
respError.GetNotFound().GetId().GetValue(),
respError.GetNotFound().GetMessage(),
)
return
}
if respError.GetOutOfRange() != nil {
fmt.Fprintf(
tabWriter,
"Requested instances:%d of job %s is not within "+
"the range of valid instances (0...%d)\n",
r.GetInvalidInstanceIds(),
respError.GetOutOfRange().GetJobId().GetValue(),
respError.GetOutOfRange().GetInstanceCount(),
)
return
}
if r.GetError().GetFailure() != nil {
fmt.Fprintf(
tabWriter,
"Tasks stop goalstate update in DB got error: %s\n",
respError.GetFailure().GetMessage(),
)
}
}
func printTaskStopResponse(r *task.StopResponse, debug bool) {
defer tabWriter.Flush()
if debug {
printResponseJSON(r)
return
}
respError := r.GetError()
if respError == nil {
if len(r.GetInvalidInstanceIds()) == 0 {
fmt.Fprintf(
tabWriter, "Successfully signalled %v tasks for stopping\n",
len(r.GetStoppedInstanceIds()))
return
}
fmt.Fprintf(
tabWriter,
"Some tasks failed to stop, failed_instance_ids:%v\n",
r.GetInvalidInstanceIds(),
)
return
}
if respError.GetNotFound() != nil {
fmt.Fprintf(
tabWriter,
"Job %s was not found: %s\n",
respError.GetNotFound().GetId().GetValue(),
respError.GetNotFound().GetMessage(),
)
return
}
if respError.GetOutOfRange() != nil {
fmt.Fprintf(
tabWriter,
"Requested instances:%d of job %s is not within "+
"the range of valid instances (0...%d)\n",
r.GetInvalidInstanceIds(),
respError.GetOutOfRange().GetJobId().GetValue(),
respError.GetOutOfRange().GetInstanceCount(),
)
return
}
if respError.GetUpdateError() != nil {
fmt.Fprintf(
tabWriter,
"Tasks stop failed with error: %s\n",
respError.GetUpdateError(),
)
}
}
func printTaskRestartResponse(r *task.RestartResponse, debug bool) {
defer tabWriter.Flush()
if debug {
printResponseJSON(r)
return
}
if r.GetNotFound() != nil {
fmt.Fprintf(tabWriter, "Job %s was not found: %s\n",
r.NotFound.Id.Value, r.NotFound.Message)
return
}
if r.GetOutOfRange() != nil {
fmt.Fprintf(tabWriter, "Requested instance of job %s is not "+
"within the range of valid instances (0...%d)\n",
r.OutOfRange.JobId.Value, r.OutOfRange.InstanceCount)
return
}
fmt.Fprint(tabWriter, "Job restarted\n")
}