pkg/common/util/util.go (471 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"encoding/json"
"fmt"
"math"
"reflect"
"sort"
"strconv"
"strings"
"time"
"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
"go.uber.org/yarpc/yarpcerrors"
mesos "github.com/uber/peloton/.gen/mesos/v1"
"github.com/uber/peloton/.gen/peloton/api/v0/job"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
"github.com/uber/peloton/.gen/peloton/api/v0/task"
v1alphapeloton "github.com/uber/peloton/.gen/peloton/api/v1alpha/peloton"
"github.com/uber/peloton/.gen/peloton/api/v1alpha/pod"
"github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc"
pbhostmgr "github.com/uber/peloton/.gen/peloton/private/hostmgr/v1alpha"
"github.com/uber/peloton/pkg/common"
)
const (
// ResourceEpsilon is the minimum epsilon mesos resource;
// This is because Mesos internally uses a fixed point precision. See MESOS-4687 for details.
ResourceEpsilon = 0.0009
)
// UUIDLength represents the length of a 16 byte v4 UUID as a string
var UUIDLength = len(uuid.New())
// Min returns the minimum value of x, y
func Min(x, y uint32) uint32 {
if x < y {
return x
}
return y
}
// Max returns the maximum value of x, y
func Max(x, y uint32) uint32 {
if x > y {
return x
}
return y
}
// PtrPrintf returns a pointer to a string format
func PtrPrintf(format string, a ...interface{}) *string {
str := fmt.Sprintf(format, a...)
return &str
}
// SubtractSlice returns the result of slice1 - slice2
// if an element is in slice2 but not in slice1, it would be ignored
func SubtractSlice(slice1 []uint32, slice2 []uint32) []uint32 {
if slice1 == nil {
return nil
}
var result []uint32
slice2Set := make(map[uint32]bool)
for _, v := range slice2 {
slice2Set[v] = true
}
for _, v := range slice1 {
if !slice2Set[v] {
result = append(result, v)
}
}
return result
}
// IntersectSlice get return the result of slice1 ∩ slice2
// element must be present in both slices
func IntersectSlice(slice1 []uint32, slice2 []uint32) []uint32 {
if slice1 == nil {
return nil
}
var result []uint32
slice2Set := make(map[uint32]bool)
for _, v := range slice2 {
slice2Set[v] = true
}
for _, v := range slice1 {
if slice2Set[v] {
result = append(result, v)
}
}
return result
}
// ConvertInstanceIDListToInstanceRange converts list
// of instance ids to list of instance ranges
func ConvertInstanceIDListToInstanceRange(instIDs []uint32) []*pod.InstanceIDRange {
var instanceIDRange []*pod.InstanceIDRange
var instanceRange *pod.InstanceIDRange
var prevInstID uint32
instIDSortLess := func(i, j int) bool {
return instIDs[i] < instIDs[j]
}
sort.Slice(instIDs, instIDSortLess)
for _, instID := range instIDs {
if instanceRange == nil {
// create a new range
instanceRange = &pod.InstanceIDRange{
From: instID,
}
} else {
// range already exists
if instID != prevInstID+1 {
// finish the previous range and start a new one
instanceRange.To = prevInstID
instanceIDRange = append(instanceIDRange, instanceRange)
instanceRange = &pod.InstanceIDRange{
From: instID,
}
}
}
prevInstID = instID
}
// finish the last instance range
if instanceRange != nil {
instanceRange.To = prevInstID
instanceIDRange = append(instanceIDRange, instanceRange)
}
return instanceIDRange
}
// GetOfferScalarResourceSummary generates a summary for all the scalar values: role -> offerName-> Value
// first level : role -> map(resource type-> resouce value)
func GetOfferScalarResourceSummary(offer *mesos.Offer) map[string]map[string]float64 {
var result = make(map[string]map[string]float64)
for _, resource := range offer.Resources {
if resource.Scalar != nil {
var role = "*"
if resource.Role != nil {
role = *resource.Role
}
if _, ok := result[role]; !ok {
result[role] = make(map[string]float64)
}
result[role][*resource.Name] = result[role][*resource.Name] + *resource.Scalar.Value
}
}
return result
}
// CreateMesosScalarResources is a helper function to convert resource values into Mesos resources.
func CreateMesosScalarResources(values map[string]float64, role string) []*mesos.Resource {
var rs []*mesos.Resource
for name, value := range values {
// Skip any value smaller than Espilon.
if math.Abs(value) < ResourceEpsilon {
continue
}
rs = append(rs, NewMesosResourceBuilder().WithName(name).WithValue(value).WithRole(role).Build())
}
return rs
}
// MesosResourceBuilder is the helper to build a mesos resource
type MesosResourceBuilder struct {
Resource mesos.Resource
}
// NewMesosResourceBuilder creates a MesosResourceBuilder
func NewMesosResourceBuilder() *MesosResourceBuilder {
defaultRole := "*"
defaultType := mesos.Value_SCALAR
return &MesosResourceBuilder{
Resource: mesos.Resource{
Role: &defaultRole,
Type: &defaultType,
},
}
}
// WithName sets name
func (o *MesosResourceBuilder) WithName(name string) *MesosResourceBuilder {
o.Resource.Name = &name
return o
}
// WithType sets type
func (o *MesosResourceBuilder) WithType(t mesos.Value_Type) *MesosResourceBuilder {
o.Resource.Type = &t
return o
}
// WithRole sets role
func (o *MesosResourceBuilder) WithRole(role string) *MesosResourceBuilder {
o.Resource.Role = &role
return o
}
// WithValue sets value
func (o *MesosResourceBuilder) WithValue(value float64) *MesosResourceBuilder {
scalarVal := mesos.Value_Scalar{
Value: &value,
}
o.Resource.Scalar = &scalarVal
return o
}
// WithRanges sets ranges
func (o *MesosResourceBuilder) WithRanges(ranges *mesos.Value_Ranges) *MesosResourceBuilder {
o.Resource.Ranges = ranges
return o
}
// WithReservation sets reservation info.
func (o *MesosResourceBuilder) WithReservation(
reservation *mesos.Resource_ReservationInfo) *MesosResourceBuilder {
o.Resource.Reservation = reservation
return o
}
// WithDisk sets disk info.
func (o *MesosResourceBuilder) WithDisk(
diskInfo *mesos.Resource_DiskInfo) *MesosResourceBuilder {
o.Resource.Disk = diskInfo
return o
}
// WithRevocable sets resource as revocable resource type
func (o *MesosResourceBuilder) WithRevocable(
revocable *mesos.Resource_RevocableInfo) *MesosResourceBuilder {
o.Resource.Revocable = revocable
return o
}
// TODO: add other building functions when needed
// Build returns the mesos resource
func (o *MesosResourceBuilder) Build() *mesos.Resource {
res := o.Resource
return &res
}
// MesosStateToPelotonState translates mesos task state to peloton task state
// TODO: adjust in case there are additional peloton states
func MesosStateToPelotonState(mstate mesos.TaskState) task.TaskState {
switch mstate {
case mesos.TaskState_TASK_STAGING:
return task.TaskState_LAUNCHED
case mesos.TaskState_TASK_STARTING:
return task.TaskState_STARTING
case mesos.TaskState_TASK_RUNNING:
return task.TaskState_RUNNING
// NOTE: This should only be sent when the framework has
// the TASK_KILLING_STATE capability.
case mesos.TaskState_TASK_KILLING:
return task.TaskState_RUNNING
case mesos.TaskState_TASK_FINISHED:
return task.TaskState_SUCCEEDED
case mesos.TaskState_TASK_FAILED:
return task.TaskState_FAILED
case mesos.TaskState_TASK_KILLED:
return task.TaskState_KILLED
case mesos.TaskState_TASK_LOST:
return task.TaskState_LOST
case mesos.TaskState_TASK_ERROR:
return task.TaskState_FAILED
default:
log.Errorf("Unknown mesos taskState %v", mstate)
return task.TaskState_INITIALIZED
}
}
// IsPelotonStateTerminal returns true if state is terminal
// otherwise false
func IsPelotonStateTerminal(state task.TaskState) bool {
switch state {
case task.TaskState_SUCCEEDED, task.TaskState_FAILED,
task.TaskState_KILLED, task.TaskState_LOST,
task.TaskState_DELETED:
return true
default:
return false
}
}
// IsTaskThrottled returns true if a task is currently
// throttled due to repeated failures
func IsTaskThrottled(state task.TaskState, message string) bool {
if IsPelotonStateTerminal(state) && message == common.TaskThrottleMessage {
return true
}
return false
}
// IsPelotonPodStateTerminal returns true if pod state is
// terminal otherwise false
func IsPelotonPodStateTerminal(state pod.PodState) bool {
switch state {
case pod.PodState_POD_STATE_SUCCEEDED, pod.PodState_POD_STATE_FAILED,
pod.PodState_POD_STATE_KILLED, pod.PodState_POD_STATE_LOST,
pod.PodState_POD_STATE_DELETED:
return true
default:
return false
}
}
// IsPelotonJobStateTerminal returns true if job state is terminal
// otherwise false
func IsPelotonJobStateTerminal(state job.JobState) bool {
switch state {
case job.JobState_SUCCEEDED, job.JobState_FAILED,
job.JobState_DELETED, job.JobState_KILLED:
return true
default:
return false
}
}
// IsTaskHasValidVolume returns true if a task is stateful and has a valid volume
func IsTaskHasValidVolume(taskInfo *task.TaskInfo) bool {
if taskInfo.GetConfig().GetVolume() != nil &&
len(taskInfo.GetRuntime().GetVolumeID().GetValue()) != 0 {
return true
}
return false
}
// CreateMesosTaskID creates mesos task id given jobID, instanceID and runID
func CreateMesosTaskID(jobID *peloton.JobID,
instanceID uint32,
runID uint64) *mesos.TaskID {
mesosID := fmt.Sprintf(
"%s-%d-%d",
jobID.GetValue(),
instanceID,
runID)
return &mesos.TaskID{Value: &mesosID}
}
// CreatePelotonTaskID creates a PelotonTaskID given jobID and instanceID
func CreatePelotonTaskID(
jobID string,
instanceID uint32,
) string {
return fmt.Sprintf("%s-%d", jobID, instanceID)
}
// CreatePodIDFromMesosTaskID creates a peloton pod ID from mesos taskID.
func CreatePodIDFromMesosTaskID(t *mesos.TaskID) *v1alphapeloton.PodID {
return &v1alphapeloton.PodID{
Value: t.GetValue(),
}
}
// CreateLeaseIDFromHostOfferID creates a LeaseId from host offer ID.
func CreateLeaseIDFromHostOfferID(id *peloton.HostOfferID) *pbhostmgr.LeaseID {
return &pbhostmgr.LeaseID{
Value: id.GetValue(),
}
}
// ParseRunID parse the runID from mesosTaskID
func ParseRunID(mesosTaskID string) (uint64, error) {
splitMesosTaskID := strings.Split(mesosTaskID, "-")
if len(mesosTaskID) == 0 { // prev mesos task id is nil
return 0,
yarpcerrors.InvalidArgumentErrorf(
"mesosTaskID provided is empty",
)
} else if len(splitMesosTaskID) == 7 {
if runID, err := strconv.ParseUint(
splitMesosTaskID[len(splitMesosTaskID)-1], 10, 64); err == nil {
return runID, nil
}
}
return 0,
yarpcerrors.InvalidArgumentErrorf(
"unable to parse mesos task id: %v",
mesosTaskID,
)
}
// ParseTaskID parses the jobID and instanceID from peloton taskID
func ParseTaskID(taskID string) (string, uint32, error) {
pos := strings.LastIndex(taskID, "-")
if len(taskID) < UUIDLength || pos == -1 {
return "", 0,
yarpcerrors.InvalidArgumentErrorf("invalid pelotonTaskID %v", taskID)
}
jobID := taskID[0:pos]
ins := taskID[pos+1:]
instanceID, err := strconv.ParseUint(ins, 10, 32)
if err != nil {
log.WithFields(log.Fields{
"task_id": taskID,
"job_id": jobID,
}).WithError(err).Error("failed to parse taskID")
log.Info(err)
return "",
0,
yarpcerrors.InvalidArgumentErrorf("unable to parse instanceID %v", taskID)
}
return jobID, uint32(instanceID), nil
}
// ParseTaskIDFromMesosTaskID parses the taskID from mesosTaskID
func ParseTaskIDFromMesosTaskID(mesosTaskID string) (string, error) {
// mesos task id would be "(jobID)-(instanceID)-(runID)" form
if len(mesosTaskID) < UUIDLength+1 {
return "", yarpcerrors.InvalidArgumentErrorf("invalid mesostaskID %v", mesosTaskID)
}
// TODO: deprecate the check once mesos task id migration is complete from
// uuid-int-uuid -> uuid(job ID)-int(instance ID)-int(monotonically incremental)
// If uuid has all digits from uuid-int-uuid then it will increment from
// that value and not default to 1.
var pelotonTaskID string
if len(mesosTaskID) > 2*UUIDLength {
pelotonTaskID = mesosTaskID[:len(mesosTaskID)-(UUIDLength+1)]
} else {
pelotonTaskID = mesosTaskID[:strings.LastIndex(mesosTaskID, "-")]
}
_, _, err := ParseTaskID(pelotonTaskID)
if err != nil {
return "", err
}
return pelotonTaskID, nil
}
// ParseJobAndInstanceID return jobID and instanceID from given mesos task id.
func ParseJobAndInstanceID(mesosTaskID string) (string, uint32, error) {
pelotonTaskID, err := ParseTaskIDFromMesosTaskID(mesosTaskID)
if err != nil {
return "", 0, err
}
return ParseTaskID(pelotonTaskID)
}
// UnmarshalToType unmarshal a string to a typed interface{}
func UnmarshalToType(jsonString string, resultType reflect.Type) (interface{},
error) {
result := reflect.New(resultType)
err := json.Unmarshal([]byte(jsonString), result.Interface())
if err != nil {
log.Errorf("Unmarshal failed with error %v, type %v, jsonString %v",
err, resultType, jsonString)
return nil, err
}
return result.Interface(), nil
}
// ConvertLabels will convert Peloton labels to Mesos labels.
func ConvertLabels(pelotonLabels []*peloton.Label) *mesos.Labels {
mesosLabels := &mesos.Labels{
Labels: make([]*mesos.Label, 0, len(pelotonLabels)),
}
for _, label := range pelotonLabels {
mesosLabel := &mesos.Label{
Key: &label.Key,
Value: &label.Value,
}
mesosLabels.Labels = append(mesosLabels.Labels, mesosLabel)
}
return mesosLabels
}
// Contains checks whether an item contains in a list
func Contains(list []string, item string) bool {
for _, name := range list {
if name == item {
return true
}
}
return false
}
// ContainsTaskState checks whether a TaskState contains in a list of TaskStates
func ContainsTaskState(list []task.TaskState, item task.TaskState) bool {
for _, name := range list {
if name == item {
return true
}
}
return false
}
// CreateHostInfo takes the agent Info and create the hostsvc.HostInfo
func CreateHostInfo(hostname string,
agentInfo *mesos.AgentInfo) *hostsvc.HostInfo {
// if agentInfo is nil , return nil HostInfo
if agentInfo == nil {
log.WithField("host", hostname).
Warn("Agent Info is nil")
return nil
}
// return the HostInfo object
return &hostsvc.HostInfo{
Hostname: hostname,
AgentId: agentInfo.Id,
Attributes: agentInfo.Attributes,
Resources: agentInfo.Resources,
}
}
// CreateSecretVolume builds a mesos volume of type secret
// from the given secret path and secret value string
// This volume will be added to the job's default config
func CreateSecretVolume(secretPath string, secretStr string) *mesos.Volume {
volumeMode := mesos.Volume_RO
volumeSourceType := mesos.Volume_Source_SECRET
secretType := mesos.Secret_VALUE
return &mesos.Volume{
Mode: &volumeMode,
ContainerPath: &secretPath,
Source: &mesos.Volume_Source{
Type: &volumeSourceType,
Secret: &mesos.Secret{
Type: &secretType,
Value: &mesos.Secret_Value{
Data: []byte(secretStr),
},
},
},
}
}
// IsSecretVolume returns true if the given volume is of type secret
func IsSecretVolume(volume *mesos.Volume) bool {
return volume.GetSource().GetType() == mesos.Volume_Source_SECRET
}
// ConfigHasSecretVolumes returns true if config contains secret volumes
func ConfigHasSecretVolumes(config *task.TaskConfig) bool {
for _, v := range config.GetContainer().GetVolumes() {
if ok := IsSecretVolume(v); ok {
return true
}
}
return false
}
// RemoveSecretVolumesFromConfig removes secret volumes from the task config
// in place and returns the secret volumes
// Secret volumes are added internally at the time of creating a job with
// secrets by handleSecrets method. They are not supplied in the config in
// job create/update requests. Consequently, they should not be displayed
// as part of Job Get API response. This is necessary to achieve the broader
// goal of using the secrets proto message in Job Create/Update/Get API to
// describe secrets and not allow users to checkin secrets as part of config
func RemoveSecretVolumesFromConfig(config *task.TaskConfig) []*mesos.Volume {
if config.GetContainer().GetVolumes() == nil {
return nil
}
secretVolumes := []*mesos.Volume{}
volumes := []*mesos.Volume{}
for _, volume := range config.GetContainer().GetVolumes() {
if ok := IsSecretVolume(volume); ok {
secretVolumes = append(secretVolumes, volume)
} else {
volumes = append(volumes, volume)
}
}
config.GetContainer().Volumes = nil
if len(volumes) > 0 {
config.GetContainer().Volumes = volumes
}
return secretVolumes
}
// RemoveSecretVolumesFromJobConfig removes secret volumes from the default
// config as well as instance config in place and returns the secret volumes
func RemoveSecretVolumesFromJobConfig(cfg *job.JobConfig) []*mesos.Volume {
// remove secret volumes if present from default config
secretVolumes := RemoveSecretVolumesFromConfig(cfg.GetDefaultConfig())
// remove secret volumes if present from instance config
for _, config := range cfg.GetInstanceConfig() {
// instance config contains the same secret volumes as default config,
// so no need to operate on them
_ = RemoveSecretVolumesFromConfig(config)
}
return secretVolumes
}
// ConvertTimestampToUnixSeconds converts timestamp string in RFC3339 format
// to the unix time in seconds.
func ConvertTimestampToUnixSeconds(timestamp string) (int64, error) {
ts, err := time.Parse(time.RFC3339, timestamp)
if err != nil {
return 0, err
}
return ts.Unix(), nil
}
// GetDereferencedJobIDsList dereferences the jobIDs list
func GetDereferencedJobIDsList(jobIDs []*peloton.JobID) []peloton.JobID {
result := []peloton.JobID{}
for _, jobID := range jobIDs {
result = append(result, *jobID)
}
return result
}
// FormatTime converts a Unix timestamp to a string format of the
// given layout in UTC. See https://golang.org/pkg/time/ for possible
// time layout in golang. For example, it will return RFC3339 format
// string like 2017-01-02T11:00:00.123456789Z if the layout is
// time.RFC3339Nano
func FormatTime(timestamp float64, layout string) string {
seconds := int64(timestamp)
nanoSec := int64((timestamp - float64(seconds)) *
float64(time.Second/time.Nanosecond))
return time.Unix(seconds, nanoSec).UTC().Format(layout)
}