pkg/cli/host_actions.go (210 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cli
import (
"fmt"
"sort"
"strings"
host "github.com/uber/peloton/.gen/peloton/api/v0/host"
host_svc "github.com/uber/peloton/.gen/peloton/api/v0/host/svc"
pb_task "github.com/uber/peloton/.gen/peloton/api/v0/task"
"github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc"
host_svc_v1 "github.com/uber/peloton/.gen/peloton/private/hostmgr/v1alpha/svc"
"github.com/uber/peloton/pkg/hostmgr/scalar"
)
const (
hostQueryFormatHeader = "Hostname\tIP\tState\tHostPool\n"
hostQueryFormatBody = "%s\t%s\t%s\t%s\n"
hostSeparator = ","
getHostsFormatHeader = "Hostname\tCPU\tGPU\tMEM\tDisk\tState\t Task Hold\t Task Running\n"
getHostsFormatBody = "%s\t%.2f\t%.2f\t%.2f MB\t%.2f MB\t%s\t%s\t%s\n"
hostCacheFormatHeader = "Hostname\tCPU\tGPU\tMEM\tDisk\tStatus\n"
hostCacheFormatBody = "%s\t%.2f/%.2f\t%.2f/%.2f\t%.2f/%.2f MB\t%.2f/%.2f MB\t%s\n"
)
// HostCacheDump dumps the contents of the host cache.
func (c *Client) HostCacheDump() error {
resp, err := c.hostMgrClientV1.GetHostCache(c.ctx,
&host_svc_v1.GetHostCacheRequest{})
if err != nil {
return err
}
if len(resp.Summaries) == 0 {
fmt.Fprintf(tabWriter, "HostCache empty.\n")
tabWriter.Flush()
return nil
}
fmt.Fprintf(tabWriter, "Hostname\tcontents:\n")
fmt.Fprintf(tabWriter, hostCacheFormatHeader)
for _, summary := range resp.Summaries {
fmt.Fprintf(tabWriter,
hostCacheFormatBody,
summary.GetHostname(),
summary.Allocation[0].Capacity,
summary.Capacity[0].Capacity,
summary.Allocation[1].Capacity,
summary.Capacity[1].Capacity,
summary.Allocation[2].Capacity,
summary.Capacity[2].Capacity,
summary.Allocation[3].Capacity,
summary.Capacity[3].Capacity,
summary.Status,
)
}
tabWriter.Flush()
return nil
}
// HostMaintenanceStartAction is the action for starting host maintenance. StartMaintenance puts the host
// into DRAINING state by posting a maintenance schedule to Mesos Master. Inverse offers are sent out and
// all future offers from the host are tagged with unavailability (Please check Mesos Maintenance
// Primitives for more info). The host is first drained of tasks before being put into maintenance
// by posting to /machine/down endpoint of Mesos Master.
// The host transitions from UP to DRAINING and finally to DOWN.
func (c *Client) HostMaintenanceStartAction(hostname string) error {
if len(hostname) == 0 {
return fmt.Errorf("Empty hostname")
}
request := &host_svc.StartMaintenanceRequest{
Hostname: hostname,
}
resp, err := c.hostClient.StartMaintenance(c.ctx, request)
if err != nil {
return err
}
fmt.Fprintf(tabWriter, "Host successfully submitted for maintenance: %s\n", resp.GetHostname())
tabWriter.Flush()
return nil
}
// HostMaintenanceCompleteAction is the action for completing host maintenance. Complete maintenance brings UP a host
// which is in maintenance by posting to /machine/up endpoint of Mesos Master i.e. the machine transitions from DOWN to
// UP state (Please check Mesos Maintenance Primitives for more info)
func (c *Client) HostMaintenanceCompleteAction(hostname string) error {
if len(hostname) == 0 {
return fmt.Errorf("Missing hostname")
}
request := &host_svc.CompleteMaintenanceRequest{
Hostname: hostname,
}
resp, err := c.hostClient.CompleteMaintenance(c.ctx, request)
if err != nil {
return err
}
fmt.Fprintf(tabWriter, "Host successfully submitted for maintenance completion: %s\n", resp.GetHostname())
tabWriter.Flush()
return nil
}
// HostQueryAction is the action for querying hosts by states. This can be to used to monitor the state of the host(s)
// Eg. When a list of hosts are put into maintenance (`host maintenance start`).
// A host, at any given time, will be in one of the following states
// 1.HostState_HOST_STATE_UP - The host is up and running
// 2.HostState_HOST_STATE_DRAINING - The tasks running on the host are being rescheduled and
// there will be no further placement of tasks on the host
// 3.HostState_HOST_STATE_DRAINED - There are no tasks running on this host and it is ready to be 'DOWN'ed
// 4.HostState_HOST_STATE_DOWN - The host is in maintenance.
func (c *Client) HostQueryAction(states string) error {
var hostStates []host.HostState
for _, state := range strings.Split(states, hostSeparator) {
if state != "" {
hostStates = append(hostStates, host.HostState(host.HostState_value[state]))
}
}
request := &host_svc.QueryHostsRequest{
HostStates: hostStates,
}
response, err := c.hostClient.QueryHosts(c.ctx, request)
if err != nil {
return err
}
printHostQueryResponse(response, c.Debug)
return nil
}
func printHostQueryResponse(r *host_svc.QueryHostsResponse, debug bool) {
if debug {
printResponseJSON(r)
} else {
if len(r.GetHostInfos()) == 0 {
fmt.Fprintf(tabWriter, "No hosts found\n")
return
}
fmt.Fprintf(tabWriter, hostQueryFormatHeader)
for _, h := range r.GetHostInfos() {
fmt.Fprintf(
tabWriter,
hostQueryFormatBody,
h.GetHostname(),
h.GetIp(),
h.GetState(),
h.GetCurrentPool(),
)
}
}
tabWriter.Flush()
}
// HostsGetAction prints all the hosts based on resource requirement
// passed in.
func (c *Client) HostsGetAction(
cpu float64,
gpu float64,
mem float64,
disk float64,
cmpLess bool,
hosts string,
revocable bool,
) error {
var hostnames []string
var err error
if len(hosts) > 0 {
hostnames, err = c.ExtractHostnames(hosts, hostSeparator)
if err != nil {
return err
}
}
resourceConfig := &pb_task.ResourceConfig{
CpuLimit: cpu,
GpuLimit: gpu,
MemLimitMb: mem,
DiskLimitMb: disk,
}
resp, _ := c.hostMgrClient.GetHostsByQuery(
c.ctx,
&hostsvc.GetHostsByQueryRequest{
Resource: resourceConfig,
CmpLess: cmpLess,
Hostnames: hostnames,
IncludeRevocable: revocable,
})
printGetHostsResponse(resp)
return nil
}
func printGetHostsResponse(resp *hostsvc.GetHostsByQueryResponse) {
defer tabWriter.Flush()
hosts := resp.GetHosts()
if len(hosts) == 0 {
fmt.Fprintln(tabWriter, "No hosts found satisfies the requirement")
} else {
sort.Slice(hosts, func(i, j int) bool {
return strings.Compare(hosts[i].GetHostname(), hosts[j].GetHostname()) < 0
})
fmt.Fprint(tabWriter, getHostsFormatHeader)
for _, host := range hosts {
resource := scalar.FromMesosResources(host.GetResources())
fmt.Fprintf(tabWriter,
getHostsFormatBody,
host.GetHostname(),
resource.GetCPU(),
resource.GetGPU(),
resource.GetMem(),
resource.GetDisk(),
host.GetStatus(),
getTaskHeldString(host),
getTasksString(host),
)
}
}
}
func getTaskHeldString(host *hostsvc.GetHostsByQueryResponse_Host) string {
var taskHeldStr string
for _, taskHeld := range host.GetHeldTasks() {
taskHeldStr = taskHeldStr + taskHeld.GetValue() + " "
}
// remove the last space
if len(taskHeldStr) != 0 {
taskHeldStr = taskHeldStr[:len(taskHeldStr)-1]
}
return taskHeldStr
}
func getTasksString(host *hostsvc.GetHostsByQueryResponse_Host) string {
var taskStr string
for _, task := range host.GetTasks() {
taskStr = taskStr + task.GetValue() + " "
}
// remove the last space
if len(taskStr) != 0 {
taskStr = taskStr[:len(taskStr)-1]
}
return taskStr
}
// DisableKillTasksAction disable the kill task request to mesos master
func (c *Client) DisableKillTasksAction() error {
_, err := c.hostMgrClient.DisableKillTasks(c.ctx, &hostsvc.DisableKillTasksRequest{})
if err != nil {
return err
}
fmt.Fprintf(tabWriter, "Disabled kill tasks request\n")
tabWriter.Flush()
return nil
}