in pkg/jobmgr/cached/job.go [1583:1721]
func (j *job) filterRuntimeDiffsBySLA(
ctx context.Context,
runtimeDiffs map[uint32]jobmgrcommon.RuntimeDiff,
) (map[uint32]jobmgrcommon.RuntimeDiff, []uint32, error) {
j.Lock()
defer j.Unlock()
instanceAvailabilityInfo, err := j.getInstanceAvailabilityInfo(ctx)
if err != nil {
return nil, nil, err
}
if err = j.populateCurrentJobConfig(ctx); err != nil {
return nil, nil, errors.Wrap(err, "failed to populate job config")
}
log.WithFields(log.Fields{
"killed_instances": instanceAvailabilityInfo.killedInstances,
"unavailable_instances": instanceAvailabilityInfo.unavailableInstances,
"runtime_diffs": runtimeDiffs,
}).Debug("instance availability before patch")
runtimesToPatch := make(map[uint32]jobmgrcommon.RuntimeDiff)
var instancesToBeRetried []uint32
for i, runtimeDiff := range runtimeDiffs {
t := j.tasks[i]
taskCurrentState := t.CurrentState()
taskGoalState := t.GoalState()
if goalState, ok := runtimeDiff[jobmgrcommon.GoalStateField]; ok &&
goalState.(pbtask.TaskState) == pbtask.TaskState_DELETED {
delete(instanceAvailabilityInfo.killedInstances, i)
delete(instanceAvailabilityInfo.unavailableInstances, i)
runtimesToPatch[i] = runtimeDiff
continue
}
if goalState, ok := runtimeDiff[jobmgrcommon.GoalStateField]; ok &&
goalState.(pbtask.TaskState) == pbtask.TaskState_KILLED {
delete(instanceAvailabilityInfo.unavailableInstances, i)
instanceAvailabilityInfo.killedInstances[i] = true
runtimesToPatch[i] = runtimeDiff
continue
}
if termStatus := runtimeDiff[jobmgrcommon.TerminationStatusField]; termStatus != nil {
reason := termStatus.(*pbtask.TerminationStatus).GetReason()
switch reason {
// If restart/kill is due to host-maintenance,
// skip doing so if SLA is violated
case pbtask.TerminationStatus_TERMINATION_STATUS_REASON_KILLED_HOST_MAINTENANCE,
pbtask.TerminationStatus_TERMINATION_STATUS_REASON_KILLED_FOR_SLA_AWARE_RESTART:
if j.config.GetSLA().GetMaximumUnavailableInstances() != 0 {
// if SLA is defined (and MaximumUnavailableInstances
// is non zero), check for SLA violation
if !instanceAvailabilityInfo.unavailableInstances[i] &&
uint32(len(instanceAvailabilityInfo.unavailableInstances)) >=
j.config.GetSLA().GetMaximumUnavailableInstances() {
continue
}
}
delete(instanceAvailabilityInfo.killedInstances, i)
instanceAvailabilityInfo.unavailableInstances[i] = true
// If restart/kill is due to job update or if the instance has failed,
// mark the instance unavailable
case pbtask.TerminationStatus_TERMINATION_STATUS_REASON_KILLED_FOR_UPDATE,
pbtask.TerminationStatus_TERMINATION_STATUS_REASON_FAILED:
delete(instanceAvailabilityInfo.killedInstances, i)
instanceAvailabilityInfo.unavailableInstances[i] = true
default:
delete(instanceAvailabilityInfo.unavailableInstances, i)
instanceAvailabilityInfo.killedInstances[i] = true
}
runtimesToPatch[i] = runtimeDiff
continue
}
if taskCurrentState.State == pbtask.TaskState_UNKNOWN ||
taskGoalState.State == pbtask.TaskState_UNKNOWN {
instancesToBeRetried = append(instancesToBeRetried, i)
continue
}
if desiredMesosTaskID := runtimeDiff[jobmgrcommon.DesiredMesosTaskIDField]; desiredMesosTaskID != nil &&
desiredMesosTaskID.(*mesos.TaskID).GetValue() != taskGoalState.MesosTaskID.GetValue() {
// If the desired mesos-task-id is being modified and
// the termination status is not set, mark the instance KILLED.
// Ideally we shouldn't hit this path since we always set the
// termination status when changing the desired mesos task
delete(instanceAvailabilityInfo.unavailableInstances, i)
instanceAvailabilityInfo.killedInstances[i] = true
} else if desiredConfigVersion, ok := runtimeDiff[jobmgrcommon.DesiredConfigVersionField]; ok &&
desiredConfigVersion.(uint64) != taskGoalState.ConfigVersion {
// if the desired config version is being changed, we need to mark
// instance as KILLED. The only exception is when both config
// version and desired config version are being set to the same
// value (for unchanged instances in update) since it doesn't
// affect instance availability
if configVersion, ok := runtimeDiff[jobmgrcommon.ConfigVersionField]; !ok ||
configVersion.(uint64) != desiredConfigVersion.(uint64) {
delete(instanceAvailabilityInfo.unavailableInstances, i)
instanceAvailabilityInfo.killedInstances[i] = true
}
} else {
goalState, ok := runtimeDiff[jobmgrcommon.GoalStateField]
if ok && goalState.(pbtask.TaskState) == pbtask.TaskState_RUNNING ||
taskGoalState.State == pbtask.TaskState_RUNNING {
// if the task goal state is being set to RUNNING or if the task
// is already RUNNING, mark it unavailable. If the task is
// incorrectly marked unavailable (say when updating HealthState),
// it'll be updated once the task has been patched (when
// instanceAvailabilityInfo is updated)
delete(instanceAvailabilityInfo.killedInstances, i)
instanceAvailabilityInfo.unavailableInstances[i] = true
}
}
runtimesToPatch[i] = runtimeDiff
}
log.WithFields(log.Fields{
"killed_instances": instanceAvailabilityInfo.killedInstances,
"unavailable_instances": instanceAvailabilityInfo.unavailableInstances,
"max_unavailable_instances": j.config.GetSLA().GetMaximumUnavailableInstances(),
}).Debug("instance availability after change")
return runtimesToPatch, instancesToBeRetried, nil
}