func()

in pkg/jobmgr/cached/job_factory.go [221:350]


func (f *jobFactory) publishMetrics() map[pbtask.TaskState]map[pbtask.TaskState]int {
	stopWatch := f.mtx.scope.Timer("publish_duration").Start()
	defer stopWatch.Stop()

	// Initialise tasks count map for all possible pairs of (state, goal_state)
	tCount := map[pbtask.TaskState]map[pbtask.TaskState]int{}
	for s := range pbtask.TaskState_name {
		tCount[pbtask.TaskState(s)] = map[pbtask.TaskState]int{}
		for gs := range pbtask.TaskState_name {
			tCount[pbtask.TaskState(s)][pbtask.TaskState(gs)] = 0
		}
	}

	// Initialize update count map for all states
	workflowCount := map[pbupdate.State]int{}
	for s := range pbupdate.State_name {
		workflowCount[pbupdate.State(s)] = 0
	}

	// Iterate through jobs, tasks and count
	jobs := f.GetAllJobs()
	var (
		totalThrottledTasks int
		spreadQuotientSum   float64
		spreadQuotientCount int64
		slaViolatedJobIDs   []string
	)
	for _, j := range jobs {
		unavailableInstances := uint32(0)
		unknownInstances := uint32(0)

		taskStateCount, throttledTasks, spread := j.GetTaskStateCount()
		for stateSummary, count := range taskStateCount {
			currentState := stateSummary.CurrentState
			goalState := stateSummary.GoalState
			healthState := stateSummary.HealthState

			tCount[currentState][goalState] += count

			if currentState == pbtask.TaskState_UNKNOWN ||
				goalState == pbtask.TaskState_UNKNOWN {
				unknownInstances = unknownInstances + uint32(count)
				continue
			}

			if goalState == pbtask.TaskState_RUNNING {
				if currentState == pbtask.TaskState_RUNNING {
					switch healthState {
					case pbtask.HealthState_DISABLED, pbtask.HealthState_HEALTHY:
						continue
					}
				}
				unavailableInstances = unavailableInstances + uint32(count)
			}
		}

		totalThrottledTasks = totalThrottledTasks + throttledTasks

		workflowStateCount := j.GetWorkflowStateCount()
		for currentState, count := range workflowStateCount {
			workflowCount[currentState] += count
		}

		if spread.hostCount > 0 {
			spreadQuotientCount++
			spreadQuotientSum +=
				(float64(spread.taskCount) / float64(spread.hostCount))
		}

		// SLA is currently defined only for stateless jobs
		if j.GetJobType() != pbjob.JobType_SERVICE {
			continue
		}

		jobConfig := j.GetCachedConfig()
		if jobConfig == nil {
			log.WithField("job_id", j.ID().GetValue()).
				Debug("job config not present in cache, skipping SLA metrics for job")
			continue
		}

		if unknownInstances > 0 {
			log.WithFields(log.Fields{
				"job_id":                j.ID().GetValue(),
				"num_unknown_instances": unknownInstances,
			}).Debug("job has instances in unknown state")
		}

		var hasActiveUpdate bool
		for _, w := range j.GetAllWorkflows() {
			if w.GetWorkflowType() == models.WorkflowType_UPDATE &&
				IsUpdateStateActive(w.GetState().State) {
				hasActiveUpdate = true
				break
			}
		}

		// skip check for SLA violation if job has an ongoing update
		if hasActiveUpdate {
			continue
		}

		if jobConfig.GetSLA().GetMaximumUnavailableInstances() > 0 &&
			unavailableInstances > jobConfig.GetSLA().GetMaximumUnavailableInstances() {
			log.WithField("job_id", j.ID().GetValue()).
				Info("job sla violated")
			slaViolatedJobIDs = append(slaViolatedJobIDs, j.ID().GetValue())
		}
	}

	// Publish
	f.mtx.scope.Gauge("jobs_count").Update(float64(len(jobs)))
	f.mtx.scope.Gauge("sla_violated_jobs").Update(float64(len(slaViolatedJobIDs)))
	f.mtx.scope.Gauge("throttled_tasks").Update(float64(totalThrottledTasks))
	if spreadQuotientCount > 0 {
		f.taskMetrics.MeanSpreadQuotient.
			Update(spreadQuotientSum / float64(spreadQuotientCount))
	}
	for s, sm := range tCount {
		for gs, tc := range sm {
			f.mtx.scope.Tagged(map[string]string{"state": s.String(), "goal_state": gs.String()}).Gauge("tasks_count").Update(float64(tc))
		}
	}

	for s, tc := range workflowCount {
		f.mtx.scope.Tagged(map[string]string{"state": s.String()}).Gauge("workflow_count").Update(float64(tc))
	}

	return tCount
}