in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/manager/JobManager.java [96:139]
public void logAndMetrics() {
if (!leaderSelector.isLeader()) {
logger.debug("skipped logAndMetrics because of current instance is not leader");
return;
}
boolean isFailure = false;
try {
Map<Long, JobSnapshot> jobs =
jobGroupStore
.getAll()
.values()
.stream()
.map(g -> g.model().getJobsList())
.flatMap(Collection::stream)
.collect(
Collectors.toMap(
k -> k.getJob().getJobId(),
v -> JobSnapshot.newBuilder().setExpectedJob(v).build()));
Map<JobState, Integer> jobStateCountMap = new HashMap<>();
for (JobSnapshot job : jobs.values()) {
jobStateCountMap.merge(
job.getExpectedJob().getState(), 1, (oldValue, newValue) -> oldValue + newValue);
}
for (Map.Entry<JobState, Integer> entry : jobStateCountMap.entrySet()) {
scope
.tagged(ImmutableMap.of("job_state", entry.getKey().name()))
.gauge("manager.job.state.expected.count")
.update((double) entry.getValue());
logger.info(
entry.getKey().toString() + " count",
StructuredLogging.count(entry.getValue()),
StructuredLogging.idealState(entry.getKey()));
}
} catch (Throwable t) {
isFailure = true;
logger.error("job manager log and metrics heartbeat failed", t);
scope.counter("manager.job.heartbeat.failed").inc(1);
} finally {
if (!isFailure) {
logger.debug("job manager log and metrics heartbeat success");
scope.counter("manager.job.heartbeat.success").inc(1);
}
}
}