in pkg/jobmgr/cached/job_factory.go [221:350]
func (f *jobFactory) publishMetrics() map[pbtask.TaskState]map[pbtask.TaskState]int {
stopWatch := f.mtx.scope.Timer("publish_duration").Start()
defer stopWatch.Stop()
// Initialise tasks count map for all possible pairs of (state, goal_state)
tCount := map[pbtask.TaskState]map[pbtask.TaskState]int{}
for s := range pbtask.TaskState_name {
tCount[pbtask.TaskState(s)] = map[pbtask.TaskState]int{}
for gs := range pbtask.TaskState_name {
tCount[pbtask.TaskState(s)][pbtask.TaskState(gs)] = 0
}
}
// Initialize update count map for all states
workflowCount := map[pbupdate.State]int{}
for s := range pbupdate.State_name {
workflowCount[pbupdate.State(s)] = 0
}
// Iterate through jobs, tasks and count
jobs := f.GetAllJobs()
var (
totalThrottledTasks int
spreadQuotientSum float64
spreadQuotientCount int64
slaViolatedJobIDs []string
)
for _, j := range jobs {
unavailableInstances := uint32(0)
unknownInstances := uint32(0)
taskStateCount, throttledTasks, spread := j.GetTaskStateCount()
for stateSummary, count := range taskStateCount {
currentState := stateSummary.CurrentState
goalState := stateSummary.GoalState
healthState := stateSummary.HealthState
tCount[currentState][goalState] += count
if currentState == pbtask.TaskState_UNKNOWN ||
goalState == pbtask.TaskState_UNKNOWN {
unknownInstances = unknownInstances + uint32(count)
continue
}
if goalState == pbtask.TaskState_RUNNING {
if currentState == pbtask.TaskState_RUNNING {
switch healthState {
case pbtask.HealthState_DISABLED, pbtask.HealthState_HEALTHY:
continue
}
}
unavailableInstances = unavailableInstances + uint32(count)
}
}
totalThrottledTasks = totalThrottledTasks + throttledTasks
workflowStateCount := j.GetWorkflowStateCount()
for currentState, count := range workflowStateCount {
workflowCount[currentState] += count
}
if spread.hostCount > 0 {
spreadQuotientCount++
spreadQuotientSum +=
(float64(spread.taskCount) / float64(spread.hostCount))
}
// SLA is currently defined only for stateless jobs
if j.GetJobType() != pbjob.JobType_SERVICE {
continue
}
jobConfig := j.GetCachedConfig()
if jobConfig == nil {
log.WithField("job_id", j.ID().GetValue()).
Debug("job config not present in cache, skipping SLA metrics for job")
continue
}
if unknownInstances > 0 {
log.WithFields(log.Fields{
"job_id": j.ID().GetValue(),
"num_unknown_instances": unknownInstances,
}).Debug("job has instances in unknown state")
}
var hasActiveUpdate bool
for _, w := range j.GetAllWorkflows() {
if w.GetWorkflowType() == models.WorkflowType_UPDATE &&
IsUpdateStateActive(w.GetState().State) {
hasActiveUpdate = true
break
}
}
// skip check for SLA violation if job has an ongoing update
if hasActiveUpdate {
continue
}
if jobConfig.GetSLA().GetMaximumUnavailableInstances() > 0 &&
unavailableInstances > jobConfig.GetSLA().GetMaximumUnavailableInstances() {
log.WithField("job_id", j.ID().GetValue()).
Info("job sla violated")
slaViolatedJobIDs = append(slaViolatedJobIDs, j.ID().GetValue())
}
}
// Publish
f.mtx.scope.Gauge("jobs_count").Update(float64(len(jobs)))
f.mtx.scope.Gauge("sla_violated_jobs").Update(float64(len(slaViolatedJobIDs)))
f.mtx.scope.Gauge("throttled_tasks").Update(float64(totalThrottledTasks))
if spreadQuotientCount > 0 {
f.taskMetrics.MeanSpreadQuotient.
Update(spreadQuotientSum / float64(spreadQuotientCount))
}
for s, sm := range tCount {
for gs, tc := range sm {
f.mtx.scope.Tagged(map[string]string{"state": s.String(), "goal_state": gs.String()}).Gauge("tasks_count").Update(float64(tc))
}
}
for s, tc := range workflowCount {
f.mtx.scope.Tagged(map[string]string{"state": s.String()}).Gauge("workflow_count").Update(float64(tc))
}
return tCount
}