pkg/jobmgr/cached/workflow_strategy.go (166 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cached
import (
pbjob "github.com/uber/peloton/.gen/peloton/api/v0/job"
pbtask "github.com/uber/peloton/.gen/peloton/api/v0/task"
pbupdate "github.com/uber/peloton/.gen/peloton/api/v0/update"
"github.com/uber/peloton/.gen/peloton/private/models"
"github.com/uber/peloton/pkg/common/util"
jobmgrcommon "github.com/uber/peloton/pkg/jobmgr/common"
updateutil "github.com/uber/peloton/pkg/jobmgr/util/update"
)
const (
_updateTaskMessage = "Job configuration updated via API"
_rollbackTaskMessage = "Job configuration updated due to rollback"
_restartTaskMessage = "Task restarted via API"
_startTaskMessage = "Task started via API"
_stopTaskMessage = "Task stopped via API"
)
// WorkflowStrategy is the strategy of driving instances to
// the desired state of the workflow
type WorkflowStrategy interface {
// IsInstanceComplete returns if an instance has reached the state
// desired by the workflow
IsInstanceComplete(desiredConfigVersion uint64, runtime *pbtask.RuntimeInfo) bool
// IsInstanceInProgress returns if an instance in the process of getting
// to the state desired by the workflow
IsInstanceInProgress(desiredConfigVersion uint64, runtime *pbtask.RuntimeInfo) bool
// IsInstanceFailed returns if an instance is failed when getting
// to the state desired by the workflow
// TODO: now a task can both get true for IsInstanceInProgress and
// IsInstanceFailed, it should get true for only one of the func.
// Now the correctness of code is guarded by order of func call.
IsInstanceFailed(runtime *pbtask.RuntimeInfo, maxAttempts uint32) bool
// GetRuntimeDiff accepts the current task runtime of an instance and the desired
// job config, it returns the RuntimeDiff to move the instance to the state desired
// by the workflow. Return nil if no action is needed.
GetRuntimeDiff(jobConfig *pbjob.JobConfig) jobmgrcommon.RuntimeDiff
}
func getWorkflowStrategy(
updateState pbupdate.State,
workflowType models.WorkflowType) WorkflowStrategy {
switch workflowType {
case models.WorkflowType_START:
return newStartStrategy()
case models.WorkflowType_STOP:
return newStopStrategy()
case models.WorkflowType_RESTART:
return newRestartStrategy()
}
if updateState == pbupdate.State_ROLLING_BACKWARD {
return newRollbackStrategy()
}
return newUpdateStrategy()
}
func newUpdateStrategy() *updateStrategy {
return &updateStrategy{}
}
type updateStrategy struct{}
func (s *updateStrategy) IsInstanceComplete(desiredConfigVersion uint64, runtime *pbtask.RuntimeInfo) bool {
// for a running task, update is completed if:
// 1. runtime desired configuration is set to desiredConfigVersion
// 2. runtime configuration is set to desired configuration
// 3. healthy state is DISABLED or HEALTHY
if runtime.GetState() == pbtask.TaskState_RUNNING {
return runtime.GetDesiredConfigVersion() == desiredConfigVersion &&
runtime.GetConfigVersion() == runtime.GetDesiredConfigVersion() &&
(runtime.GetHealthy() == pbtask.HealthState_DISABLED ||
runtime.GetHealthy() == pbtask.HealthState_HEALTHY)
}
// for a terminated task, update is completed if:
// 1. runtime desired configuration is set to desiredConfigVersion
// runtime configuration does not matter as it will be set to
// runtime desired configuration when it starts
if util.IsPelotonStateTerminal(runtime.GetState()) &&
util.IsPelotonStateTerminal(runtime.GetGoalState()) {
return runtime.GetDesiredConfigVersion() == desiredConfigVersion
}
return false
}
func (s *updateStrategy) IsInstanceFailed(
runtime *pbtask.RuntimeInfo,
maxAttempts uint32) bool {
return updateutil.HasFailedUpdate(runtime, maxAttempts)
}
func (s *updateStrategy) IsInstanceInProgress(desiredConfigVersion uint64, runtime *pbtask.RuntimeInfo) bool {
// runtime desired config version has been set to the desired,
// but update has not completed
return runtime.GetDesiredConfigVersion() == desiredConfigVersion &&
!s.IsInstanceComplete(desiredConfigVersion, runtime)
}
func (s *updateStrategy) GetRuntimeDiff(jobConfig *pbjob.JobConfig) jobmgrcommon.RuntimeDiff {
return jobmgrcommon.RuntimeDiff{
jobmgrcommon.DesiredConfigVersionField: jobConfig.GetChangeLog().GetVersion(),
jobmgrcommon.MessageField: _updateTaskMessage,
// when updating a task, failure count due to old version should be reset
jobmgrcommon.FailureCountField: uint32(0),
jobmgrcommon.ReasonField: "",
jobmgrcommon.TerminationStatusField: &pbtask.TerminationStatus{
Reason: pbtask.TerminationStatus_TERMINATION_STATUS_REASON_KILLED_FOR_UPDATE,
},
}
}
// rollbackStrategy inherits upgradeStrategy
func newRollbackStrategy() *rollbackStrategy {
return &rollbackStrategy{newUpdateStrategy()}
}
type rollbackStrategy struct {
WorkflowStrategy
}
func (s *rollbackStrategy) GetRuntimeDiff(jobConfig *pbjob.JobConfig) jobmgrcommon.RuntimeDiff {
return jobmgrcommon.RuntimeDiff{
jobmgrcommon.DesiredConfigVersionField: jobConfig.GetChangeLog().GetVersion(),
jobmgrcommon.MessageField: _rollbackTaskMessage,
// when updating a task, failure count due to old version should be reset
jobmgrcommon.FailureCountField: uint32(0),
jobmgrcommon.ReasonField: "",
jobmgrcommon.TerminationStatusField: &pbtask.TerminationStatus{
Reason: pbtask.TerminationStatus_TERMINATION_STATUS_REASON_KILLED_FOR_UPDATE,
},
}
}
// restartStrategy inherits upgradeStrategy
func newRestartStrategy() *restartStrategy {
return &restartStrategy{newUpdateStrategy()}
}
type restartStrategy struct {
WorkflowStrategy
}
func (s *restartStrategy) GetRuntimeDiff(jobConfig *pbjob.JobConfig) jobmgrcommon.RuntimeDiff {
return jobmgrcommon.RuntimeDiff{
jobmgrcommon.GoalStateField: getDefaultTaskGoalState(jobConfig.GetType()),
jobmgrcommon.DesiredConfigVersionField: jobConfig.GetChangeLog().GetVersion(),
jobmgrcommon.MessageField: _restartTaskMessage,
jobmgrcommon.ReasonField: "",
jobmgrcommon.TerminationStatusField: &pbtask.TerminationStatus{
Reason: pbtask.TerminationStatus_TERMINATION_STATUS_REASON_KILLED_FOR_RESTART,
},
}
}
// newStartStrategy inherits upgradeStrategy
func newStartStrategy() *startStrategy {
return &startStrategy{newUpdateStrategy()}
}
type startStrategy struct {
WorkflowStrategy
}
func (s *startStrategy) GetRuntimeDiff(jobConfig *pbjob.JobConfig) jobmgrcommon.RuntimeDiff {
// always update both config version and desired config version, no matter
// the current task state. If startStrategy only update desired config
// version when task is in terminal state, it is possible that the task
// transits into a non-terminal state in another thread, and the action would
// become a restart action.
return jobmgrcommon.RuntimeDiff{
jobmgrcommon.GoalStateField: getDefaultTaskGoalState(jobConfig.GetType()),
jobmgrcommon.ConfigVersionField: jobConfig.GetChangeLog().GetVersion(),
jobmgrcommon.DesiredConfigVersionField: jobConfig.GetChangeLog().GetVersion(),
jobmgrcommon.MessageField: _startTaskMessage,
jobmgrcommon.ReasonField: "",
}
}
func newStopStrategy() *stopStrategy {
return &stopStrategy{}
}
type stopStrategy struct{}
// stopStrategy.IsInstanceComplete does not reuse
// updateStrategy.IsInstanceComplete, because stopStrategy needs to update
// config version in GetRuntimeDiff, which is different for updateStrategy.
func (s *stopStrategy) IsInstanceComplete(desiredConfigVersion uint64, runtime *pbtask.RuntimeInfo) bool {
// stop is completed if:
// 1. runtime desired configuration is set to desiredConfigVersion
// runtime configuration does not matter as it will be set to
// runtime desired configuration when it starts
// 2. runtime desired configuration is set to desiredConfigVersion,
// but goal state is not terminal. It means, user may start the
// task again via task level API. If this case is not handled,
// the stop workflow can get stuck.
if runtime.GetDesiredConfigVersion() == desiredConfigVersion {
return (util.IsPelotonStateTerminal(runtime.GetState()) &&
util.IsPelotonStateTerminal(runtime.GetGoalState())) ||
!util.IsPelotonStateTerminal(runtime.GetGoalState())
}
return false
}
func (s *stopStrategy) IsInstanceInProgress(desiredConfigVersion uint64, runtime *pbtask.RuntimeInfo) bool {
// runtime desired config version has been set to the desired,
// but stop has not completed
return runtime.GetDesiredConfigVersion() == desiredConfigVersion &&
!s.IsInstanceComplete(desiredConfigVersion, runtime)
}
func (s *stopStrategy) IsInstanceFailed(
runtime *pbtask.RuntimeInfo,
maxAttempts uint32) bool {
return false
}
func (s *stopStrategy) GetRuntimeDiff(jobConfig *pbjob.JobConfig) jobmgrcommon.RuntimeDiff {
// always set goal state to KILLED to prevent any failure retry
return jobmgrcommon.RuntimeDiff{
jobmgrcommon.ConfigVersionField: jobConfig.GetChangeLog().GetVersion(),
jobmgrcommon.DesiredConfigVersionField: jobConfig.GetChangeLog().GetVersion(),
jobmgrcommon.GoalStateField: pbtask.TaskState_KILLED,
jobmgrcommon.MessageField: _stopTaskMessage,
jobmgrcommon.ReasonField: "",
jobmgrcommon.TerminationStatusField: &pbtask.TerminationStatus{
Reason: pbtask.TerminationStatus_TERMINATION_STATUS_REASON_KILLED_ON_REQUEST,
},
}
}
// TODO: reuse the function in jobmgr/util, now it would create import cycle.
func getDefaultTaskGoalState(jobType pbjob.JobType) pbtask.TaskState {
switch jobType {
case pbjob.JobType_SERVICE:
return pbtask.TaskState_RUNNING
default:
return pbtask.TaskState_SUCCEEDED
}
}