func()

in pkg/jobmgr/cached/job.go [1583:1721]


func (j *job) filterRuntimeDiffsBySLA(
	ctx context.Context,
	runtimeDiffs map[uint32]jobmgrcommon.RuntimeDiff,
) (map[uint32]jobmgrcommon.RuntimeDiff, []uint32, error) {
	j.Lock()
	defer j.Unlock()

	instanceAvailabilityInfo, err := j.getInstanceAvailabilityInfo(ctx)
	if err != nil {
		return nil, nil, err
	}

	if err = j.populateCurrentJobConfig(ctx); err != nil {
		return nil, nil, errors.Wrap(err, "failed to populate job config")
	}

	log.WithFields(log.Fields{
		"killed_instances":      instanceAvailabilityInfo.killedInstances,
		"unavailable_instances": instanceAvailabilityInfo.unavailableInstances,
		"runtime_diffs":         runtimeDiffs,
	}).Debug("instance availability before patch")

	runtimesToPatch := make(map[uint32]jobmgrcommon.RuntimeDiff)
	var instancesToBeRetried []uint32
	for i, runtimeDiff := range runtimeDiffs {
		t := j.tasks[i]
		taskCurrentState := t.CurrentState()
		taskGoalState := t.GoalState()
		if goalState, ok := runtimeDiff[jobmgrcommon.GoalStateField]; ok &&
			goalState.(pbtask.TaskState) == pbtask.TaskState_DELETED {

			delete(instanceAvailabilityInfo.killedInstances, i)
			delete(instanceAvailabilityInfo.unavailableInstances, i)

			runtimesToPatch[i] = runtimeDiff
			continue
		}

		if goalState, ok := runtimeDiff[jobmgrcommon.GoalStateField]; ok &&
			goalState.(pbtask.TaskState) == pbtask.TaskState_KILLED {

			delete(instanceAvailabilityInfo.unavailableInstances, i)
			instanceAvailabilityInfo.killedInstances[i] = true

			runtimesToPatch[i] = runtimeDiff
			continue
		}

		if termStatus := runtimeDiff[jobmgrcommon.TerminationStatusField]; termStatus != nil {
			reason := termStatus.(*pbtask.TerminationStatus).GetReason()

			switch reason {
			// If restart/kill is due to host-maintenance,
			// skip doing so if SLA is violated
			case pbtask.TerminationStatus_TERMINATION_STATUS_REASON_KILLED_HOST_MAINTENANCE,
				pbtask.TerminationStatus_TERMINATION_STATUS_REASON_KILLED_FOR_SLA_AWARE_RESTART:
				if j.config.GetSLA().GetMaximumUnavailableInstances() != 0 {
					// if SLA is defined (and MaximumUnavailableInstances
					// is non zero), check for SLA violation
					if !instanceAvailabilityInfo.unavailableInstances[i] &&
						uint32(len(instanceAvailabilityInfo.unavailableInstances)) >=
							j.config.GetSLA().GetMaximumUnavailableInstances() {
						continue
					}
				}

				delete(instanceAvailabilityInfo.killedInstances, i)
				instanceAvailabilityInfo.unavailableInstances[i] = true

			// If restart/kill is due to job update or if the instance has failed,
			// mark the instance unavailable
			case pbtask.TerminationStatus_TERMINATION_STATUS_REASON_KILLED_FOR_UPDATE,
				pbtask.TerminationStatus_TERMINATION_STATUS_REASON_FAILED:
				delete(instanceAvailabilityInfo.killedInstances, i)
				instanceAvailabilityInfo.unavailableInstances[i] = true

			default:
				delete(instanceAvailabilityInfo.unavailableInstances, i)
				instanceAvailabilityInfo.killedInstances[i] = true
			}

			runtimesToPatch[i] = runtimeDiff
			continue
		}

		if taskCurrentState.State == pbtask.TaskState_UNKNOWN ||
			taskGoalState.State == pbtask.TaskState_UNKNOWN {
			instancesToBeRetried = append(instancesToBeRetried, i)
			continue
		}

		if desiredMesosTaskID := runtimeDiff[jobmgrcommon.DesiredMesosTaskIDField]; desiredMesosTaskID != nil &&
			desiredMesosTaskID.(*mesos.TaskID).GetValue() != taskGoalState.MesosTaskID.GetValue() {

			// If the desired mesos-task-id is being modified and
			// the termination status is not set, mark the instance KILLED.
			// Ideally we shouldn't hit this path since we always set the
			// termination status when changing the desired mesos task
			delete(instanceAvailabilityInfo.unavailableInstances, i)
			instanceAvailabilityInfo.killedInstances[i] = true

		} else if desiredConfigVersion, ok := runtimeDiff[jobmgrcommon.DesiredConfigVersionField]; ok &&
			desiredConfigVersion.(uint64) != taskGoalState.ConfigVersion {
			// if the desired config version is being changed, we need to mark
			// instance as KILLED. The only exception is when both config
			// version and desired config version are being set to the same
			// value (for unchanged instances in update) since it doesn't
			// affect instance availability
			if configVersion, ok := runtimeDiff[jobmgrcommon.ConfigVersionField]; !ok ||
				configVersion.(uint64) != desiredConfigVersion.(uint64) {
				delete(instanceAvailabilityInfo.unavailableInstances, i)
				instanceAvailabilityInfo.killedInstances[i] = true
			}

		} else {
			goalState, ok := runtimeDiff[jobmgrcommon.GoalStateField]
			if ok && goalState.(pbtask.TaskState) == pbtask.TaskState_RUNNING ||
				taskGoalState.State == pbtask.TaskState_RUNNING {
				// if the task goal state is being set to RUNNING or if the task
				// is already RUNNING, mark it unavailable. If the task is
				// incorrectly marked unavailable (say when updating HealthState),
				// it'll be updated once the task has been patched (when
				// instanceAvailabilityInfo is updated)
				delete(instanceAvailabilityInfo.killedInstances, i)
				instanceAvailabilityInfo.unavailableInstances[i] = true
			}
		}

		runtimesToPatch[i] = runtimeDiff
	}

	log.WithFields(log.Fields{
		"killed_instances":          instanceAvailabilityInfo.killedInstances,
		"unavailable_instances":     instanceAvailabilityInfo.unavailableInstances,
		"max_unavailable_instances": j.config.GetSLA().GetMaximumUnavailableInstances(),
	}).Debug("instance availability after change")

	return runtimesToPatch, instancesToBeRetried, nil
}