public void logAndMetrics()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/manager/JobManager.java [96:139]


  public void logAndMetrics() {
    if (!leaderSelector.isLeader()) {
      logger.debug("skipped logAndMetrics because of current instance is not leader");
      return;
    }
    boolean isFailure = false;
    try {
      Map<Long, JobSnapshot> jobs =
          jobGroupStore
              .getAll()
              .values()
              .stream()
              .map(g -> g.model().getJobsList())
              .flatMap(Collection::stream)
              .collect(
                  Collectors.toMap(
                      k -> k.getJob().getJobId(),
                      v -> JobSnapshot.newBuilder().setExpectedJob(v).build()));
      Map<JobState, Integer> jobStateCountMap = new HashMap<>();
      for (JobSnapshot job : jobs.values()) {
        jobStateCountMap.merge(
            job.getExpectedJob().getState(), 1, (oldValue, newValue) -> oldValue + newValue);
      }
      for (Map.Entry<JobState, Integer> entry : jobStateCountMap.entrySet()) {
        scope
            .tagged(ImmutableMap.of("job_state", entry.getKey().name()))
            .gauge("manager.job.state.expected.count")
            .update((double) entry.getValue());
        logger.info(
            entry.getKey().toString() + " count",
            StructuredLogging.count(entry.getValue()),
            StructuredLogging.idealState(entry.getKey()));
      }
    } catch (Throwable t) {
      isFailure = true;
      logger.error("job manager log and metrics heartbeat failed", t);
      scope.counter("manager.job.heartbeat.failed").inc(1);
    } finally {
      if (!isFailure) {
        logger.debug("job manager log and metrics heartbeat success");
        scope.counter("manager.job.heartbeat.success").inc(1);
      }
    }
  }