public void logAndMetrics()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/manager/WorkerManager.java [47:80]


  public void logAndMetrics() {
    if (!leaderSelector.isLeader()) {
      logger.debug("skipped logAndMetrics because of current instance is not leader");
      return;
    }
    boolean isFailure = false;
    try {
      Map<Long, Versioned<StoredWorker>> workers = workerStore.getAll();
      Map<WorkerState, Integer> stateCountMap = new HashMap<>();
      for (Versioned<StoredWorker> worker : workers.values()) {
        stateCountMap.merge(
            worker.model().getState(), 1, (oldValue, newValue) -> oldValue + newValue);
      }
      for (Map.Entry<WorkerState, Integer> entry : stateCountMap.entrySet()) {
        scope
            .tagged(ImmutableMap.of("worker_state", entry.getKey().name()))
            .gauge("manager.worker.state.count")
            .update((double) entry.getValue());
        logger.info(
            entry.getKey().toString() + " count",
            StructuredLogging.count(entry.getValue()),
            StructuredLogging.workerState(entry.getKey()));
      }
    } catch (Throwable t) {
      isFailure = true;
      logger.error("worker manager log and metrics heartbeat failed", t);
      scope.counter("manager.worker.heartbeat.failed").inc(1);
    } finally {
      if (!isFailure) {
        logger.debug("worker manager log and metrics heartbeat success");
        scope.counter("manager.worker.heartbeat.success").inc(1);
      }
    }
  }