pkg/jobmgr/goalstate/update_run.go (730 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package goalstate
import (
"context"
"time"
pbjob "github.com/uber/peloton/.gen/peloton/api/v0/job"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
pbtask "github.com/uber/peloton/.gen/peloton/api/v0/task"
pbupdate "github.com/uber/peloton/.gen/peloton/api/v0/update"
"github.com/uber/peloton/.gen/peloton/private/models"
"github.com/uber/peloton/pkg/common/goalstate"
"github.com/uber/peloton/pkg/common/taskconfig"
"github.com/uber/peloton/pkg/common/util"
"github.com/uber/peloton/pkg/jobmgr/cached"
jobmgrcommon "github.com/uber/peloton/pkg/jobmgr/common"
"github.com/uber/peloton/pkg/jobmgr/task"
log "github.com/sirupsen/logrus"
"go.uber.org/yarpc/yarpcerrors"
)
// UpdateRun is responsible to check which instances have been updated,
// start the next set of instances to update and update the state
// of the job update in cache and DB.
func UpdateRun(ctx context.Context, entity goalstate.Entity) error {
updateEnt := entity.(*updateEntity)
goalStateDriver := updateEnt.driver
cachedWorkflow, cachedJob, err := fetchWorkflowAndJobFromCache(
ctx, updateEnt.jobID, updateEnt.id, goalStateDriver)
if err != nil || cachedWorkflow == nil || cachedJob == nil {
log.WithFields(log.Fields{
"update_id": updateEnt.id.GetValue(),
}).WithError(err).Info("unable to run update")
goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1)
return err
}
// TODO: remove after recovery is done when reading state
if cachedWorkflow.GetState().State == pbupdate.State_INVALID {
return UpdateReload(ctx, entity)
}
instancesCurrent, instancesDoneFromLastRun, instancesFailedFromLastRun, err :=
cached.GetUpdateProgress(
ctx,
cachedJob.ID(),
cachedWorkflow,
cachedWorkflow.GetGoalState().JobVersion,
cachedWorkflow.GetInstancesCurrent(),
goalStateDriver.taskStore,
)
if err != nil {
goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1)
return err
}
instancesFailed := append(
cachedWorkflow.GetInstancesFailed(),
instancesFailedFromLastRun...)
instancesDone := append(
cachedWorkflow.GetInstancesDone(),
instancesDoneFromLastRun...)
// number of failed instances in the workflow exceeds limit and
// max instance retries is set, process the failed workflow and
// return directly
// TODO: use job SLA if GetMaxFailureInstances is not set
if cachedWorkflow.GetUpdateConfig().GetMaxFailureInstances() != 0 &&
uint32(len(instancesFailed)) >=
cachedWorkflow.GetUpdateConfig().GetMaxFailureInstances() {
err := processFailedUpdate(
ctx,
cachedJob,
cachedWorkflow,
instancesDone,
instancesFailed,
instancesCurrent,
goalStateDriver,
)
if err != nil {
goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1)
}
return err
}
instancesToAdd, instancesToUpdate, instancesToRemove :=
getInstancesForUpdateRun(
ctx,
cachedJob,
cachedWorkflow,
instancesCurrent,
instancesDone,
instancesFailed,
)
instancesToAdd, instancesToUpdate, instancesToRemove, instancesRemovedDone, err :=
confirmInstancesStatus(
ctx,
cachedJob,
cachedWorkflow,
instancesToAdd,
instancesToUpdate,
instancesToRemove,
)
if err != nil {
goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1)
return err
}
instancesDone = append(instancesDone, instancesRemovedDone...)
if err := processUpdate(
ctx,
cachedJob,
cachedWorkflow,
instancesToAdd,
instancesToUpdate,
instancesToRemove,
goalStateDriver,
); err != nil {
goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1)
return err
}
if err := writeUpdateProgress(
ctx,
cachedJob,
cachedWorkflow,
cachedWorkflow.GetState().State,
instancesDone,
instancesFailed,
instancesCurrent,
instancesToAdd,
instancesToUpdate,
instancesToRemove,
); err != nil {
goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1)
return err
}
if err := postUpdateAction(
ctx,
cachedJob,
cachedWorkflow,
instancesToAdd,
instancesToUpdate,
instancesToRemove,
instancesDone,
instancesFailed,
goalStateDriver); err != nil {
goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1)
return err
}
// TODO (varung):
// - Use len for instances current
// - Remove instances_added, instances_removed and instances_updated
log.WithFields(log.Fields{
"update_id": updateEnt.id.GetValue(),
"job_id": cachedJob.ID().GetValue(),
"update_type": cachedWorkflow.GetWorkflowType().String(),
"instances_current": cachedWorkflow.GetInstancesCurrent(),
"instances_failed": len(cachedWorkflow.GetInstancesFailed()),
"instances_done": len(cachedWorkflow.GetInstancesDone()),
"instances_added": len(cachedWorkflow.GetInstancesAdded()),
"instances_removed": len(cachedWorkflow.GetInstancesRemoved()),
"instances_updated": len(cachedWorkflow.GetInstancesUpdated()),
}).Info("update running")
goalStateDriver.mtx.updateMetrics.UpdateRun.Inc(1)
return nil
}
// processFailedUpdate is called when the update fails due to
// too many instances fail during the process. It update the
// state to failed and enqueue it to goal state engine directly.
func processFailedUpdate(
ctx context.Context,
cachedJob cached.Job,
cachedUpdate cached.Update,
instancesDone []uint32,
instancesFailed []uint32,
instancesCurrent []uint32,
driver *driver,
) error {
// rollback the update if RollbackOnFailure is set and
// the update itself is not a rollback
if cachedUpdate.GetUpdateConfig().RollbackOnFailure &&
!isUpdateRollback(cachedUpdate) {
// write the progress first, because when rollback happens,
// workflow does not know the newly finished/failed instances.
cachedJob.WriteWorkflowProgress(
ctx,
cachedUpdate.ID(),
cachedUpdate.GetState().State,
instancesDone,
instancesFailed,
instancesCurrent,
)
if err := cachedJob.RollbackWorkflow(ctx); err != nil {
log.WithFields(log.Fields{
"update_id": cachedUpdate.ID().GetValue(),
"job_id": cachedJob.ID().GetValue(),
}).WithError(err).
Info("fail to rollback update")
return err
}
cachedConfig, err := cachedJob.GetConfig(ctx)
if err != nil {
log.WithFields(log.Fields{
"update_id": cachedUpdate.ID().GetValue(),
"job_id": cachedJob.ID().GetValue(),
}).WithError(err).
Info("fail to get job config to rollback update")
return err
}
if err := handleUnchangedInstancesInUpdate(
ctx,
cachedUpdate,
cachedJob,
cachedConfig,
); err != nil {
log.WithFields(log.Fields{
"update_id": cachedUpdate.ID().GetValue(),
"job_id": cachedJob.ID().GetValue(),
}).WithError(err).
Info("fail to update unchanged instances to rollback update")
return err
}
log.WithFields(log.Fields{
"update_id": cachedUpdate.ID().GetValue(),
"job_id": cachedJob.ID().GetValue(),
}).Info("update rolling back")
} else {
if err := cachedJob.WriteWorkflowProgress(
ctx,
cachedUpdate.ID(),
pbupdate.State_FAILED,
instancesDone,
instancesFailed,
instancesCurrent,
); err != nil {
return err
}
}
driver.EnqueueUpdate(cachedJob.ID(), cachedUpdate.ID(), time.Now())
return nil
}
// isUpdateRollback returns if an update is a rolling back to a
// previous version
func isUpdateRollback(cachedUpdate cached.Update) bool {
if cachedUpdate.GetWorkflowType() != models.WorkflowType_UPDATE {
return false
}
return cachedUpdate.GetState().State == pbupdate.State_ROLLING_BACKWARD
}
// postUpdateAction performs actions after update run is finished for
// one run of UpdateRun. Its job:
// 1. Enqueue update if update is completed finished
// 2. Enqueue update if any task updated/removed in this run has already
// been updated/killed
func postUpdateAction(
ctx context.Context,
cachedJob cached.Job,
cachedUpdate cached.Update,
instancesAddedInCurrentRun []uint32,
instancesUpdatedInCurrentRun []uint32,
instancesRemovedInCurrentRun []uint32,
instancesDone []uint32,
instancesFailed []uint32,
goalStateDriver Driver,
) error {
// update finishes, reenqueue the update
if len(cachedUpdate.GetGoalState().Instances) == len(instancesDone)+len(instancesFailed) {
goalStateDriver.EnqueueUpdate(
cachedJob.ID(),
cachedUpdate.ID(),
time.Now())
return nil
}
instancesInCurrentRun :=
append(instancesAddedInCurrentRun,
append(instancesUpdatedInCurrentRun, instancesRemovedInCurrentRun...)...)
// if any of the task updated/removed in this round is a killed task or
// has already finished update/kill, reenqueue the update, because
// more instances can be updated without receiving task event.
for _, instanceID := range instancesInCurrentRun {
cachedTask := cachedJob.GetTask(instanceID)
if cachedTask == nil {
continue
}
runtime, err := cachedTask.GetRuntime(ctx)
if err != nil {
return err
}
// directly begin the next update because some tasks have already completed update
// and more update can begin without waiting.
if isTaskUpdateCompleted(cachedUpdate, runtime) ||
isTaskTerminated(runtime) {
goalStateDriver.EnqueueUpdate(
cachedJob.ID(), cachedUpdate.ID(), time.Now())
return nil
}
}
return nil
}
// A special case is that UpdateRun is retried multiple times. And
// the task updated in the run have already finished update.
// As a result, no more task event would be received, so JobMgr
// needs to deal with this case separately.
func isTaskUpdateCompleted(cachedUpdate cached.Update, runtime *pbtask.RuntimeInfo) bool {
return runtime.GetState() == pbtask.TaskState_RUNNING &&
runtime.GetConfigVersion() == runtime.GetDesiredConfigVersion() &&
runtime.GetConfigVersion() == cachedUpdate.GetGoalState().JobVersion
}
// isTaskTerminated returns whether a task is terminated and would
// not be started again
func isTaskTerminated(runtime *pbtask.RuntimeInfo) bool {
return util.IsPelotonStateTerminal(runtime.GetState()) &&
util.IsPelotonStateTerminal(runtime.GetGoalState())
}
func writeUpdateProgress(
ctx context.Context,
cachedJob cached.Job,
cachedUpdate cached.Update,
updateState pbupdate.State,
instancesDone []uint32,
instancesFailed []uint32,
previousInstancesCurrent []uint32,
instancesAdded []uint32,
instancesUpdated []uint32,
instancesRemoved []uint32,
) error {
newInstancesCurrent := append(previousInstancesCurrent, instancesAdded...)
newInstancesCurrent = append(newInstancesCurrent, instancesUpdated...)
newInstancesCurrent = append(newInstancesCurrent, instancesRemoved...)
// update the state of the job update
return cachedJob.WriteWorkflowProgress(
ctx,
cachedUpdate.ID(),
updateState,
instancesDone,
instancesFailed,
newInstancesCurrent,
)
}
func processUpdate(
ctx context.Context,
cachedJob cached.Job,
cachedUpdate cached.Update,
instancesToAdd []uint32,
instancesToUpdate []uint32,
instancesToRemove []uint32,
goalStateDriver *driver) error {
// no action needed if there is no instances to update/add
if len(instancesToUpdate)+len(instancesToAdd)+len(instancesToRemove) == 0 {
return nil
}
jobConfig, _, err := goalStateDriver.jobConfigOps.Get(
ctx,
cachedJob.ID(),
cachedUpdate.GetGoalState().JobVersion)
if err != nil {
return err
}
err = addInstancesInUpdate(
ctx,
cachedJob,
instancesToAdd,
jobConfig,
goalStateDriver)
if err != nil {
return err
}
err = processInstancesInUpdate(
ctx,
cachedJob,
cachedUpdate,
instancesToUpdate,
jobConfig,
goalStateDriver,
)
if err != nil {
return err
}
err = removeInstancesInUpdate(
ctx,
cachedJob,
instancesToRemove,
jobConfig,
goalStateDriver,
)
return err
}
// addInstancesInUpdate will add instances specified in instancesToAdd
// in cachedJob.
// It would create and send the new tasks to resmgr. And if the job
// is set to KILLED goal state, the function would reset the goal state
// to the default goal state.
func addInstancesInUpdate(
ctx context.Context,
cachedJob cached.Job,
instancesToAdd []uint32,
jobConfig *pbjob.JobConfig,
goalStateDriver *driver) error {
var tasks []*pbtask.TaskInfo
runtimes := make(map[uint32]*pbtask.RuntimeInfo)
if len(instancesToAdd) == 0 {
return nil
}
// move job goal state from KILLED to RUNNING
jobRuntime, err := cachedJob.GetRuntime(ctx)
if err != nil {
return err
}
// now lets add the new instances
for _, instID := range instancesToAdd {
runtime, err := getTaskRuntimeIfExisted(ctx, cachedJob, instID)
if err != nil {
return err
}
if runtime != nil {
if runtime.GetState() == pbtask.TaskState_INITIALIZED {
// runtime is initialized, do not create the task again and directly
// send to ResMgr
taskInfo := &pbtask.TaskInfo{
JobId: cachedJob.ID(),
InstanceId: instID,
Runtime: runtime,
Config: taskconfig.Merge(
jobConfig.GetDefaultConfig(),
jobConfig.GetInstanceConfig()[instID]),
}
tasks = append(tasks, taskInfo)
} else {
log.WithFields(log.Fields{
"job_id": cachedJob.ID().GetValue(),
"instance_id": instID,
"state": runtime.GetState().String(),
}).Info(
"task added in update has non-nil runtime in uninitialized state")
}
} else {
// runtime is nil, initialize the runtime
runtime := task.CreateInitializingTask(
cachedJob.ID(), instID, jobConfig)
if err = updateWithRecentRunID(
ctx,
cachedJob.ID(),
instID,
runtime,
goalStateDriver); err != nil {
return err
}
runtime.ConfigVersion = jobConfig.GetChangeLog().GetVersion()
runtime.DesiredConfigVersion =
jobConfig.GetChangeLog().GetVersion()
// job goal state is KILLED, set task cur and desired state to KILLED to
// avoid unnecessary task creation
if jobRuntime.GetGoalState() == pbjob.JobState_KILLED {
runtime.State = pbtask.TaskState_KILLED
runtime.GoalState = pbtask.TaskState_KILLED
}
// do not send to resmgr if task goal state is KILLED
if runtime.GetGoalState() != pbtask.TaskState_KILLED {
taskInfo := &pbtask.TaskInfo{
JobId: cachedJob.ID(),
InstanceId: instID,
Runtime: runtime,
Config: taskconfig.Merge(
jobConfig.GetDefaultConfig(),
jobConfig.GetInstanceConfig()[instID]),
}
tasks = append(tasks, taskInfo)
}
runtimes[instID] = runtime
}
}
// Create the tasks
if len(runtimes) > 0 {
if err := cachedJob.CreateTaskRuntimes(ctx, runtimes, "peloton"); err != nil {
return err
}
}
// send to resource manager
return sendTasksToResMgr(
ctx, cachedJob.ID(), tasks, jobConfig, goalStateDriver)
}
// getTaskRuntimeIfExisted returns task runtime if the task is created.
// it would return nil RuntimeInfo and nil error if the task runtime does
// not exist
func getTaskRuntimeIfExisted(
ctx context.Context,
cachedJob cached.Job,
instanceID uint32,
) (*pbtask.RuntimeInfo, error) {
cachedTask := cachedJob.GetTask(instanceID)
if cachedTask == nil {
return nil, nil
}
runtime, err := cachedTask.GetRuntime(ctx)
if yarpcerrors.IsNotFound(err) {
return nil, nil
}
if err != nil {
return nil, err
}
return runtime, nil
}
// processInstancesInUpdate update the existing instances in instancesToUpdate
func processInstancesInUpdate(
ctx context.Context,
cachedJob cached.Job,
cachedUpdate cached.Update,
instancesToUpdate []uint32,
jobConfig *pbjob.JobConfig,
goalStateDriver *driver) error {
if len(instancesToUpdate) == 0 {
return nil
}
runtimes := make(map[uint32]jobmgrcommon.RuntimeDiff)
for _, instID := range instancesToUpdate {
runtimeDiff := cachedUpdate.GetRuntimeDiff(jobConfig)
if runtimeDiff != nil {
cachedTask, err := cachedJob.AddTask(ctx, instID)
if err != nil {
return err
}
runtime, err := cachedTask.GetRuntime(ctx)
if err != nil {
return err
}
if cachedUpdate.GetUpdateConfig().GetInPlace() {
runtimeDiff[jobmgrcommon.DesiredHostField] = getDesiredHostField(runtime)
} else {
runtimeDiff[jobmgrcommon.DesiredHostField] = ""
}
if runtime.GetGoalState() == pbtask.TaskState_DELETED ||
cachedUpdate.GetUpdateConfig().GetStartTasks() {
runtimeDiff[jobmgrcommon.GoalStateField] = pbtask.TaskState_RUNNING
}
runtimes[instID] = runtimeDiff
}
}
if len(runtimes) > 0 {
// we do not need to handle `instancesToBeRetried` here. Since all
// instances in `instancesToUpdate` are being enqueued into Task
// goalstate engine, their runtimes will be reloaded into cache when
// they are evaluated. The update will be retried in the next update cycle.
if _, _, err := cachedJob.PatchTasks(ctx, runtimes, false); err != nil {
return err
}
}
for _, instID := range instancesToUpdate {
goalStateDriver.EnqueueTask(cachedJob.ID(), instID, time.Now())
}
return nil
}
func getDesiredHostField(runtime *pbtask.RuntimeInfo) string {
// desired host field is reset when the task runs again.
// if host field is not reset when being updated, it means
// either the task was in LAUNCHED/STARTING state or there
// is an update overwrite when the task was killed by the prev
// update. In either case, we should just reuse the previous
// desired host field
if len(runtime.GetDesiredHost()) != 0 {
return runtime.GetDesiredHost()
}
// host field is set when the task is launched,
// it is reset when the task is killed. For all the
// states in between, the task maybe running on the host
// already. Therefore, set the current host as desired host.
if !util.IsPelotonStateTerminal(runtime.GetState()) {
return runtime.GetHost()
}
return ""
}
// removeInstancesInUpdate kills the instances being removed in the update
func removeInstancesInUpdate(
ctx context.Context,
cachedJob cached.Job,
instancesToRemove []uint32,
jobConfig *pbjob.JobConfig,
goalStateDriver *driver) error {
if len(instancesToRemove) == 0 {
return nil
}
runtimes := make(map[uint32]jobmgrcommon.RuntimeDiff)
for _, instID := range instancesToRemove {
runtimes[instID] = jobmgrcommon.RuntimeDiff{
jobmgrcommon.GoalStateField: pbtask.TaskState_DELETED,
jobmgrcommon.DesiredConfigVersionField: jobConfig.GetChangeLog().GetVersion(),
jobmgrcommon.MessageField: "Task Count reduced via API",
jobmgrcommon.TerminationStatusField: &pbtask.TerminationStatus{
Reason: pbtask.TerminationStatus_TERMINATION_STATUS_REASON_KILLED_FOR_UPDATE,
},
jobmgrcommon.FailureCountField: uint32(0),
}
}
if len(runtimes) > 0 {
// we do not need to handle `instancesToBeRetried` here. Since the
// instances are being enqueued into Task goalstate engine, their runtimes
// will be reloaded into cache when they are evaluated. The update will
// be retried in the next update cycle.
if _, _, err := cachedJob.PatchTasks(ctx, runtimes, false); err != nil {
return err
}
}
for _, instID := range instancesToRemove {
goalStateDriver.EnqueueTask(cachedJob.ID(), instID, time.Now())
}
return nil
}
func confirmInstancesStatus(
ctx context.Context,
cachedJob cached.Job,
cachedUpdate cached.Update,
instancesToAdd []uint32,
instancesToUpdate []uint32,
instancesToRemove []uint32,
) (
newInstancesToAdd []uint32,
newInstancesToUpdate []uint32,
newInstancesToRemove []uint32,
instancesDone []uint32,
err error,
) {
for _, instID := range instancesToAdd {
var cachedTask cached.Task
var runtime *pbtask.RuntimeInfo
cachedTask, err = cachedJob.AddTask(ctx, instID)
if err == nil {
runtime, err = cachedTask.GetRuntime(ctx)
if err != nil {
if yarpcerrors.IsNotFound(err) {
// runtime does not exist, lets try to add it
newInstancesToAdd = append(newInstancesToAdd, instID)
continue
}
// got some error, just retry later
return
}
// instance already exists
if runtime.GetConfigVersion() == cachedUpdate.GetGoalState().JobVersion {
// instance exists with correct configuration version
newInstancesToAdd = append(newInstancesToAdd, instID)
} else {
// instance exists with previous configuration version,
// hence needs to be updated
newInstancesToUpdate = append(newInstancesToUpdate, instID)
}
continue
}
if yarpcerrors.IsNotFound(err) ||
err == cached.InstanceIDExceedsInstanceCountError {
// instance does not exist
newInstancesToAdd = append(newInstancesToAdd, instID)
continue
}
// got some error, just retry later
return
}
for _, instID := range instancesToUpdate {
var cachedTask cached.Task
cachedTask, err = cachedJob.AddTask(ctx, instID)
if err != nil {
if yarpcerrors.IsNotFound(err) {
// not found, add it
newInstancesToAdd = append(newInstancesToAdd, instID)
continue
}
// got some error, just retry later
return
}
_, err = cachedTask.GetRuntime(ctx)
if err != nil {
if yarpcerrors.IsNotFound(err) {
// not found, add it
newInstancesToAdd = append(newInstancesToAdd, instID)
continue
}
// got some error, just retry later
return
}
newInstancesToUpdate = append(newInstancesToUpdate, instID)
}
for _, instID := range instancesToRemove {
_, err = cachedJob.AddTask(ctx, instID)
if err != nil {
if yarpcerrors.IsNotFound(err) ||
err == cached.InstanceIDExceedsInstanceCountError {
// not found, already removed
instancesDone = append(instancesDone, instID)
continue
}
return
}
// remove it
newInstancesToRemove = append(newInstancesToRemove, instID)
}
// clear the error and return
err = nil
return
}
// getInstancesForUpdateRun returns the instances to update/add in
// the given call of UpdateRun.
func getInstancesForUpdateRun(
ctx context.Context,
cachedJob cached.Job,
update cached.Update,
instancesCurrent []uint32,
instancesDone []uint32,
instancesFailed []uint32,
) (
instancesToAdd []uint32,
instancesToUpdate []uint32,
instancesToRemove []uint32,
) {
unprocessedInstancesToAdd,
unprocessedInstancesToUpdate,
unprocessedInstancesToRemove := getUnprocessedInstances(
update,
instancesCurrent,
instancesDone,
instancesFailed,
)
if len(unprocessedInstancesToUpdate) != 0 {
unprocessedInstancesToUpdate = sortInstancesByAvailability(
ctx,
cachedJob,
unprocessedInstancesToUpdate,
)
}
// if batch size is 0 or updateConfig is nil, update all of the instances
if update.GetUpdateConfig().GetBatchSize() == 0 {
return unprocessedInstancesToAdd,
unprocessedInstancesToUpdate,
unprocessedInstancesToRemove
}
maxNumOfInstancesToProcess :=
int(update.GetUpdateConfig().GetBatchSize()) - len(instancesCurrent)
// if instances being updated are more than batch size, do not update anything
if maxNumOfInstancesToProcess <= 0 {
return nil, nil, nil
}
// if can process all of the remaining instances
if maxNumOfInstancesToProcess >
len(unprocessedInstancesToAdd)+len(unprocessedInstancesToUpdate)+
len(unprocessedInstancesToRemove) {
return unprocessedInstancesToAdd,
unprocessedInstancesToUpdate,
unprocessedInstancesToRemove
}
// if can process all of the instances to add, update
// and part of instances to remove
if maxNumOfInstancesToProcess >
len(unprocessedInstancesToAdd)+len(unprocessedInstancesToUpdate) {
return unprocessedInstancesToAdd,
unprocessedInstancesToUpdate,
unprocessedInstancesToRemove[:maxNumOfInstancesToProcess-
len(unprocessedInstancesToAdd)-
len(unprocessedInstancesToUpdate)]
}
// if can process all of the instances to add,
// and part of instances to update
if maxNumOfInstancesToProcess > len(unprocessedInstancesToAdd) {
return unprocessedInstancesToAdd,
unprocessedInstancesToUpdate[:maxNumOfInstancesToProcess-len(unprocessedInstancesToAdd)],
nil
}
// if can process part of the instances to add
return unprocessedInstancesToAdd[:maxNumOfInstancesToProcess], nil, nil
}
// sortInstancesByAvailability sorts the instances of the job by its availability.
// The sort order is
// 1. unavailable-instances
// 2. killed-instances
// 3. invalid-instances
// 4. available-instances
// This is needed because we need to try to update unhealthy instances
// before healthy ones in order to keep the number of unavailable instances to
// a minimum thereby giving the update workflow the best chance to progress.
func sortInstancesByAvailability(
ctx context.Context,
cachedJob cached.Job,
instances []uint32,
) []uint32 {
instancesByAvailability := make(map[jobmgrcommon.InstanceAvailability_Type][]uint32)
instanceAvailabilityByInstance := cachedJob.GetInstanceAvailabilityType(ctx, instances...)
for _, i := range instances {
availabilityType := instanceAvailabilityByInstance[i]
instancesByAvailability[availabilityType] = append(
instancesByAvailability[availabilityType],
i,
)
}
var sortedInstances []uint32
sortedInstances = append(
sortedInstances,
instancesByAvailability[jobmgrcommon.InstanceAvailability_UNAVAILABLE]...,
)
sortedInstances = append(
sortedInstances,
instancesByAvailability[jobmgrcommon.InstanceAvailability_KILLED]...,
)
sortedInstances = append(
sortedInstances,
instancesByAvailability[jobmgrcommon.InstanceAvailability_INVALID]...,
)
sortedInstances = append(
sortedInstances,
instancesByAvailability[jobmgrcommon.InstanceAvailability_AVAILABLE]...,
)
return sortedInstances
}
// getUnprocessedInstances returns all of the
// instances remaining to update/add
func getUnprocessedInstances(
update cached.Update,
instancesCurrent []uint32,
instancesDone []uint32,
instancesFailed []uint32,
) (instancesRemainToAdd []uint32,
instancesRemainToUpdate []uint32,
instancesRemainToRemove []uint32) {
instancesProcessed := append(instancesCurrent, instancesDone...)
instancesProcessed = append(instancesProcessed, instancesFailed...)
instancesRemainToAdd = util.SubtractSlice(update.GetInstancesAdded(), instancesProcessed)
instancesRemainToUpdate = util.SubtractSlice(update.GetInstancesUpdated(), instancesProcessed)
instancesRemainToRemove = util.SubtractSlice(update.GetInstancesRemoved(), instancesProcessed)
return
}
// updateWithRecentRunID has primary use case to sync runID from persistent storage
// for previously removed instance that is added back again.
//
// 1. Fetches most recent pod event to get last runID
// 2. If RunID exists for this instance, then update the runtime with
// last RunID. Primary reason to not start RunID for newly added instance
// is to prevent overwriting previous pod events at storage.
// 3. Starting from most recent RunID enables user to fetch sandbox logs,
// state transitions for previous instance runs.
func updateWithRecentRunID(
ctx context.Context,
jobID *peloton.JobID,
instanceID uint32,
runtime *pbtask.RuntimeInfo,
goalStateDriver *driver) error {
podEvents, err := goalStateDriver.podEventsOps.GetAll(
ctx,
jobID.GetValue(),
instanceID)
if err != nil {
return err
}
// instance removed previously during update is being added back.
if len(podEvents) > 0 {
runID, err := util.ParseRunID(podEvents[0].GetTaskId().GetValue())
if err != nil {
return err
}
runtime.MesosTaskId = util.CreateMesosTaskID(
jobID,
instanceID,
runID+1)
runtime.DesiredMesosTaskId = runtime.MesosTaskId
runtime.PrevMesosTaskId = podEvents[0].GetTaskId()
}
return nil
}