in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/manager/WorkerManager.java [47:80]
public void logAndMetrics() {
if (!leaderSelector.isLeader()) {
logger.debug("skipped logAndMetrics because of current instance is not leader");
return;
}
boolean isFailure = false;
try {
Map<Long, Versioned<StoredWorker>> workers = workerStore.getAll();
Map<WorkerState, Integer> stateCountMap = new HashMap<>();
for (Versioned<StoredWorker> worker : workers.values()) {
stateCountMap.merge(
worker.model().getState(), 1, (oldValue, newValue) -> oldValue + newValue);
}
for (Map.Entry<WorkerState, Integer> entry : stateCountMap.entrySet()) {
scope
.tagged(ImmutableMap.of("worker_state", entry.getKey().name()))
.gauge("manager.worker.state.count")
.update((double) entry.getValue());
logger.info(
entry.getKey().toString() + " count",
StructuredLogging.count(entry.getValue()),
StructuredLogging.workerState(entry.getKey()));
}
} catch (Throwable t) {
isFailure = true;
logger.error("worker manager log and metrics heartbeat failed", t);
scope.counter("manager.worker.heartbeat.failed").inc(1);
} finally {
if (!isFailure) {
logger.debug("worker manager log and metrics heartbeat success");
scope.counter("manager.worker.heartbeat.success").inc(1);
}
}
}