pkg/jobmgr/cached/task.go (540 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cached
import (
"context"
"sync"
"time"
mesos "github.com/uber/peloton/.gen/mesos/v1"
pbjob "github.com/uber/peloton/.gen/peloton/api/v0/job"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
pbtask "github.com/uber/peloton/.gen/peloton/api/v0/task"
"github.com/uber/peloton/pkg/common/util"
jobmgrcommon "github.com/uber/peloton/pkg/jobmgr/common"
"github.com/golang/protobuf/proto"
"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
"go.uber.org/yarpc/yarpcerrors"
)
var uuidLength = len(uuid.New())
// IsResMgrOwnedState returns true if the task state indicates that the task
// is either waiting for admission or being placed or being preempted.
func IsResMgrOwnedState(state pbtask.TaskState) bool {
_, ok := resMgrOwnedTaskStates[state]
if ok {
return true
}
return false
}
// IsMesosOwnedState returns true if the task state indicates that the task
// is present in mesos.
func IsMesosOwnedState(state pbtask.TaskState) bool {
_, ok := mesosOwnedTaskStates[state]
if ok {
return true
}
return false
}
// Task in the cache.
type Task interface {
// Identifier of the task.
ID() uint32
// Job identifier the task belongs to.
JobID() *peloton.JobID
// GetRuntime returns the task run time
GetRuntime(ctx context.Context) (*pbtask.RuntimeInfo, error)
// GetCacheRuntime returns the task run time stored in the cache.
// It returns nil if the is no runtime in the cache.
GetCacheRuntime() *pbtask.RuntimeInfo
// GetLabels returns the task labels
GetLabels(ctx context.Context) ([]*peloton.Label, error)
// CurrentState of the task.
CurrentState() TaskStateVector
// GoalState of the task.
GoalState() TaskStateVector
// StateSummary of the task.
StateSummary() TaskStateSummary
// TerminationStatus of the task.
TerminationStatus() *pbtask.TerminationStatus
}
// TaskStateVector defines the state of a task.
// This encapsulates both the actual state and the goal state.
type TaskStateVector struct {
State pbtask.TaskState
ConfigVersion uint64
MesosTaskID *mesos.TaskID
}
type TaskStateSummary struct {
CurrentState pbtask.TaskState
GoalState pbtask.TaskState
HealthState pbtask.HealthState
}
// newTask creates a new cache task object
func newTask(jobID *peloton.JobID, id uint32, jobFactory *jobFactory, jobType pbjob.JobType) *task {
task := &task{
jobID: jobID,
id: id,
jobType: jobType,
jobFactory: jobFactory,
}
return task
}
// taskConfigCache is the structure which defines the
// subset of task configuration to be stored in the cache
type taskConfigCache struct {
configVersion uint64 // the current configuration version
labels []*peloton.Label // task labels
revocable bool // whether task uses revocable resources
}
// task structure holds the information about a given task in the cache.
type task struct {
sync.RWMutex // Mutex to acquire before accessing any task information in cache
jobID *peloton.JobID // Parent job identifier
id uint32 // instance identifier
jobType pbjob.JobType // Parent job type
jobFactory *jobFactory // Pointer to the parent job factory object
runtime *pbtask.RuntimeInfo // task runtime information
config *taskConfigCache // task configuration information
initializedAt time.Time // Task intialization timestamp
}
func (t *task) ID() uint32 {
return t.id
}
func (t *task) JobID() *peloton.JobID {
return t.jobID
}
// validateMesosTaskID validates whether newRunID is greater than current runID,
// since each restart/update for a task's runID is monotonically incremented.
func validateMesosTaskID(mesosTaskID, prevMesosTaskID string) bool {
// TODO: remove this check, post mesostaskID migration.
if len(mesosTaskID) > 2*uuidLength && len(prevMesosTaskID) > 2*uuidLength {
return true
}
var newRunID, currentRunID uint64
var err error
if newRunID, err = util.ParseRunID(mesosTaskID); err != nil {
return false
}
// TODO: remove prevMesosTaskID len check post mesostaskID migration
if currentRunID, err = util.ParseRunID(prevMesosTaskID); err != nil {
return len(prevMesosTaskID) > 2*uuidLength || false
}
return newRunID >= currentRunID
}
// validateState returns true if the state transition from the previous
// task runtime to the current one is valid.
func (t *task) validateState(newRuntime *pbtask.RuntimeInfo) bool {
currentRuntime := t.runtime
if newRuntime == nil {
// no runtime is invalid
return false
}
// if current goal state is deleted, it cannot be overwritten
// till the desired configuration version also changes
if currentRuntime.GetGoalState() == pbtask.TaskState_DELETED &&
newRuntime.GetGoalState() != currentRuntime.GetGoalState() {
if currentRuntime.GetDesiredConfigVersion() == newRuntime.GetDesiredConfigVersion() {
return false
}
}
if newRuntime.GetMesosTaskId() != nil {
if currentRuntime.GetMesosTaskId().GetValue() !=
newRuntime.GetMesosTaskId().GetValue() {
// Validate post migration, new runid is greater than previous one
if !validateMesosTaskID(newRuntime.GetMesosTaskId().GetValue(),
currentRuntime.GetMesosTaskId().GetValue()) {
return false
}
// mesos task id has changed
if newRuntime.GetState() == pbtask.TaskState_INITIALIZED {
return true
}
return false
}
}
// desired mesos task id should not have runID decrease at
// any time
if newRuntime.GetDesiredMesosTaskId() != nil &&
!validateMesosTaskID(newRuntime.GetDesiredMesosTaskId().GetValue(),
currentRuntime.GetDesiredMesosTaskId().GetValue()) {
return false
}
// if state update is not requested, then return true
if newRuntime.GetState() == currentRuntime.GetState() {
return true
}
//TBD replace if's with more structured checks
if util.IsPelotonStateTerminal(currentRuntime.GetState()) {
// cannot overwrite terminal state without changing the mesos task id
return false
}
if IsMesosOwnedState(newRuntime.GetState()) {
// update from mesos eventstream is ok from mesos states, resource manager states
// and from INITIALIZED and LAUNCHED states.
if IsMesosOwnedState(currentRuntime.GetState()) || IsResMgrOwnedState(currentRuntime.GetState()) {
return true
}
if currentRuntime.GetState() == pbtask.TaskState_INITIALIZED || currentRuntime.GetState() == pbtask.TaskState_LAUNCHED {
return true
}
// Update from KILLING state to only terminal states is allowed
if util.IsPelotonStateTerminal(newRuntime.GetState()) && currentRuntime.GetState() == pbtask.TaskState_KILLING {
return true
}
}
if IsResMgrOwnedState(newRuntime.GetState()) {
// update from resource manager evenstream is ok from resource manager states or INITIALIZED state
if IsResMgrOwnedState(currentRuntime.GetState()) {
return true
}
if currentRuntime.GetState() == pbtask.TaskState_INITIALIZED {
return true
}
}
if newRuntime.GetState() == pbtask.TaskState_LAUNCHED {
// update to LAUNCHED state from resource manager states and INITIALIZED state is ok
if IsResMgrOwnedState(currentRuntime.GetState()) {
return true
}
if currentRuntime.GetState() == pbtask.TaskState_INITIALIZED {
return true
}
}
if newRuntime.GetState() == pbtask.TaskState_KILLING {
// update to KILLING state from any non-terminal state is ok
return true
}
// any other state transition is invalid
return false
}
// updateConfig is used to update the config in the task cache.
// It should be called with the write task lock held.
func (t *task) updateConfig(ctx context.Context, configVersion uint64) error {
if t.config != nil && t.config.configVersion == configVersion {
// no change, do nothing
return nil
}
taskConfig, _, err := t.jobFactory.taskConfigV2Ops.GetTaskConfig(
ctx, t.jobID, t.id, configVersion)
if err != nil {
return err
}
t.config = &taskConfigCache{
configVersion: configVersion,
labels: taskConfig.GetLabels(),
revocable: taskConfig.GetRevocable(),
}
return nil
}
// cleanTaskCache cleans the task runtime and labels in the task cache.
// It should be called with the write task lock held.
func (t *task) cleanTaskCache() {
t.runtime = nil
t.config = nil
}
// createTask creates the task runtime in DB and cache
func (t *task) createTask(ctx context.Context, runtime *pbtask.RuntimeInfo, owner string) error {
var runtimeCopy *pbtask.RuntimeInfo
var labelsCopy []*peloton.Label
// notify listeners after dropping the lock
defer func() {
t.jobFactory.notifyTaskRuntimeChanged(
t.JobID(),
t.ID(),
t.jobType,
runtimeCopy,
labelsCopy,
)
}()
t.Lock()
defer t.Unlock()
// First create the runtime in DB and then store in the cache if DB create is successful
err := t.jobFactory.taskStore.CreateTaskRuntime(
ctx,
t.jobID,
t.id,
runtime,
owner,
t.jobType)
if err != nil {
t.cleanTaskCache()
return err
}
// Update the config in cache only after creating the runtime in DB
// so that cache is not populated if runtime write fails.
err = t.updateConfig(ctx, runtime.GetConfigVersion())
if err != nil {
t.cleanTaskCache()
return err
}
t.logStateTransitionMetrics(runtime)
t.runtime = runtime
runtimeCopy = proto.Clone(t.runtime).(*pbtask.RuntimeInfo)
labelsCopy = t.copyLabelsInCache()
return nil
}
// patchTask patches diff to the existing runtime cache
// in task and persists to DB.
func (t *task) patchTask(ctx context.Context, diff jobmgrcommon.RuntimeDiff) error {
if diff == nil {
return yarpcerrors.InvalidArgumentErrorf(
"unexpected nil diff")
}
if _, ok := diff[jobmgrcommon.RevisionField]; ok {
return yarpcerrors.InvalidArgumentErrorf(
"unexpected Revision field in diff")
}
var runtimeCopy *pbtask.RuntimeInfo
var labelsCopy []*peloton.Label
// notify listeners after dropping the lock
defer func() {
t.jobFactory.notifyTaskRuntimeChanged(
t.JobID(),
t.ID(),
t.jobType,
runtimeCopy,
labelsCopy,
)
}()
t.Lock()
defer t.Unlock()
// reload cache if there is none
if t.runtime == nil {
// fetch runtime from db if not present in cache
err := t.updateRuntimeFromDB(ctx)
if err != nil {
return err
}
}
// make a copy of runtime since patch() would update runtime in place
newRuntime := *t.runtime
newRuntimePtr := &newRuntime
if err := patch(newRuntimePtr, diff); err != nil {
return err
}
// validate if the patched runtime is valid,
// if not ignore the diff, since the runtime has already been updated by
// other threads and the change in diff is no longer valid
if !t.validateState(newRuntimePtr) {
return nil
}
t.updateRevision(newRuntimePtr)
err := t.jobFactory.taskStore.UpdateTaskRuntime(
ctx,
t.jobID,
t.id,
newRuntimePtr,
t.jobType)
if err != nil {
// clean the runtime in cache on DB write failure
t.cleanTaskCache()
return err
}
err = t.updateConfig(ctx, newRuntimePtr.GetConfigVersion())
if err != nil {
t.cleanTaskCache()
return err
}
t.logStateTransitionMetrics(newRuntimePtr)
// Store the new runtime in cache
t.runtime = newRuntimePtr
runtimeCopy = proto.Clone(t.runtime).(*pbtask.RuntimeInfo)
labelsCopy = t.copyLabelsInCache()
return nil
}
// compareAndSetTask replaces the existing task runtime in DB and cache.
// It uses RuntimeInfo.Revision.Version for concurrency control, and it would
// update RuntimeInfo.Revision.Version automatically upon success.
// Caller should not manually modify the value of RuntimeInfo.Revision.Version.
func (t *task) compareAndSetTask(
ctx context.Context,
runtime *pbtask.RuntimeInfo,
jobType pbjob.JobType,
) (*pbtask.RuntimeInfo, error) {
if runtime == nil {
return nil, yarpcerrors.InvalidArgumentErrorf(
"unexpected nil runtime")
}
var runtimeCopy *pbtask.RuntimeInfo
var labelsCopy []*peloton.Label
// notify listeners after dropping the lock
defer func() {
t.jobFactory.notifyTaskRuntimeChanged(
t.JobID(),
t.ID(),
jobType,
runtimeCopy,
labelsCopy,
)
}()
t.Lock()
defer t.Unlock()
// reload cache if there is none
if t.runtime == nil {
// fetch runtime from db if not present in cache
err := t.updateRuntimeFromDB(ctx)
if err != nil {
return nil, err
}
}
// validate that the input version is the same as the version in cache
if runtime.GetRevision().GetVersion() !=
t.runtime.GetRevision().GetVersion() {
t.cleanTaskCache()
return nil, jobmgrcommon.UnexpectedVersionError
}
// validate if the patched runtime is valid,
// if not ignore the diff, since the runtime has already been updated by
// other threads and the change in diff is no longer valid
if !t.validateState(runtime) {
return nil, nil
}
// bump up the changelog version
t.updateRevision(runtime)
// update the DB
err := t.jobFactory.taskStore.UpdateTaskRuntime(
ctx,
t.jobID,
t.id,
runtime,
jobType)
if err != nil {
// clean the runtime in cache on DB write failure
t.cleanTaskCache()
return nil, err
}
err = t.updateConfig(ctx, runtime.GetConfigVersion())
if err != nil {
t.cleanTaskCache()
return nil, err
}
t.logStateTransitionMetrics(runtime)
// Store the new runtime in cache
t.runtime = runtime
runtimeCopy = proto.Clone(t.runtime).(*pbtask.RuntimeInfo)
labelsCopy = t.copyLabelsInCache()
return runtimeCopy, nil
}
// deleteTask deletes any state, if any, stored by the task and let the
// listeners know that the task is being deleted.
func (t *task) deleteTask() {
// There is no state to clean up as of now.
// If the goal state was set to DELETED, then let the
// listeners know that the task has been deleted.
var runtimeCopy *pbtask.RuntimeInfo
var labelsCopy []*peloton.Label
// notify listeners after dropping the lock
defer func() {
if runtimeCopy != nil {
t.jobFactory.notifyTaskRuntimeChanged(
t.jobID,
t.id,
t.jobType,
runtimeCopy,
labelsCopy,
)
}
}()
t.RLock()
defer t.RUnlock()
if t.runtime == nil {
return
}
if t.runtime.GetGoalState() != pbtask.TaskState_DELETED {
return
}
runtimeCopy = proto.Clone(t.runtime).(*pbtask.RuntimeInfo)
runtimeCopy.State = pbtask.TaskState_DELETED
labelsCopy = t.copyLabelsInCache()
}
// replaceTask replaces runtime and config in cache with runtime input.
// forceReplace would decide whether to check version when replacing the runtime and config,
// it should only be used in Refresh for debugging purpose
func (t *task) replaceTask(
runtime *pbtask.RuntimeInfo,
taskConfig *pbtask.TaskConfig,
forceReplace bool) error {
if runtime == nil || runtime.GetRevision() == nil {
return yarpcerrors.InvalidArgumentErrorf(
"replaceTask expects a non-nil runtime with non-nil Revision")
}
var runtimeCopy *pbtask.RuntimeInfo
var labelsCopy []*peloton.Label
// notify listeners after dropping the lock
defer func() {
t.jobFactory.notifyTaskRuntimeChanged(
t.JobID(),
t.ID(),
t.jobType,
runtimeCopy,
labelsCopy,
)
}()
t.Lock()
defer t.Unlock()
// update the cache if,
// 1. it is a force replace, or
// 2. there is no existing runtime cache,
// 3. new runtime has a higher version number than the existing
if forceReplace ||
t.runtime == nil ||
runtime.GetRevision().GetVersion() > t.runtime.GetRevision().GetVersion() {
// Update task config and runtime
t.config = &taskConfigCache{
configVersion: runtime.GetConfigVersion(),
labels: taskConfig.GetLabels(),
}
t.runtime = runtime
runtimeCopy = proto.Clone(t.runtime).(*pbtask.RuntimeInfo)
labelsCopy = t.copyLabelsInCache()
}
return nil
}
func (t *task) updateRevision(runtime *pbtask.RuntimeInfo) {
if runtime.Revision == nil {
// should never enter here
log.WithField("job_id", t.jobID).
WithField("instance_id", t.id).
Error("runtime revision is nil in update tasks")
runtime.Revision = &peloton.ChangeLog{
CreatedAt: uint64(time.Now().UnixNano()),
}
}
// bump up the runtime version
runtime.Revision.Version++
runtime.Revision.UpdatedAt = uint64(time.Now().UnixNano())
}
// updateRuntimeFromDB is a helper function to load the runtime from the DB
// into the cache. This has to be called with the write lock held.
func (t *task) updateRuntimeFromDB(ctx context.Context) error {
if t.runtime != nil {
return nil
}
runtime, err := t.jobFactory.taskStore.GetTaskRuntime(ctx, t.jobID, t.id)
if err != nil {
return err
}
t.runtime = runtime
return nil
}
func (t *task) GetRuntime(ctx context.Context) (*pbtask.RuntimeInfo, error) {
t.Lock()
defer t.Unlock()
if t.runtime == nil {
// If runtime is not present in the cache, then fetch from the DB
err := t.updateRuntimeFromDB(ctx)
if err != nil {
return nil, err
}
}
runtime := proto.Clone(t.runtime).(*pbtask.RuntimeInfo)
return runtime, nil
}
func (t *task) GetCacheRuntime() *pbtask.RuntimeInfo {
t.RLock()
defer t.RUnlock()
if t.runtime == nil {
return nil
}
runtime := proto.Clone(t.runtime).(*pbtask.RuntimeInfo)
return runtime
}
// copyLabelsInCache returns a copy of the labels in the cache.
// At least a read lock should be held before invoking the API.
func (t *task) copyLabelsInCache() []*peloton.Label {
var labelsCopy []*peloton.Label
if t.config == nil {
return labelsCopy
}
for _, label := range t.config.labels {
var l peloton.Label
l = *label
labelsCopy = append(labelsCopy, &l)
}
return labelsCopy
}
func (t *task) GetLabels(ctx context.Context) ([]*peloton.Label, error) {
t.Lock()
defer t.Unlock()
if t.runtime == nil {
err := t.updateRuntimeFromDB(ctx)
if err != nil {
return nil, err
}
}
if t.config == nil {
err := t.updateConfig(ctx, t.runtime.GetConfigVersion())
if err != nil {
return nil, err
}
}
return t.copyLabelsInCache(), nil
}
func (t *task) CurrentState() TaskStateVector {
t.RLock()
defer t.RUnlock()
return TaskStateVector{
State: t.runtime.GetState(),
ConfigVersion: t.runtime.GetConfigVersion(),
MesosTaskID: t.runtime.GetMesosTaskId(),
}
}
func (t *task) GoalState() TaskStateVector {
t.RLock()
defer t.RUnlock()
return TaskStateVector{
State: t.runtime.GetGoalState(),
ConfigVersion: t.runtime.GetDesiredConfigVersion(),
MesosTaskID: t.runtime.GetDesiredMesosTaskId(),
}
}
func (t *task) StateSummary() TaskStateSummary {
t.RLock()
defer t.RUnlock()
return TaskStateSummary{
CurrentState: t.runtime.GetState(),
GoalState: t.runtime.GetGoalState(),
HealthState: t.runtime.GetHealthy(),
}
}
func (t *task) TerminationStatus() *pbtask.TerminationStatus {
t.RLock()
defer t.RUnlock()
return t.runtime.GetTerminationStatus()
}
func (t *task) logStateTransitionMetrics(runtime *pbtask.RuntimeInfo) {
if runtime.GetState() == pbtask.TaskState_INITIALIZED {
t.initializedAt = time.Now()
return
}
if t.runtime != nil && t.runtime.GetState() == runtime.GetState() {
// Ignore if there is no change in state
return
}
if t.initializedAt.IsZero() {
// If initialization time is not set then we cannot calculate
// time-to-assign or time-to-run
return
}
revocable := false
if t.config != nil {
revocable = t.config.revocable
}
tt := time.Since(t.initializedAt)
switch runtime.GetState() {
case pbtask.TaskState_LAUNCHED:
if revocable {
t.jobFactory.taskMetrics.TimeToAssignRevocable.Record(tt)
} else {
t.jobFactory.taskMetrics.TimeToAssignNonRevocable.Record(tt)
}
case pbtask.TaskState_RUNNING:
if revocable {
t.jobFactory.taskMetrics.TimeToRunRevocable.Record(tt)
} else {
t.jobFactory.taskMetrics.TimeToRunNonRevocable.Record(tt)
}
}
}
// GetResourceManagerProcessingStates returns the active task states in Resource Manager
func GetResourceManagerProcessingStates() []string {
states := make([]string, len(resMgrOwnedTaskStates))
i := 0
for k := range resMgrOwnedTaskStates {
states[i] = k.String()
i++
}
return states
}