in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [440:532]
private void emitMetrics(
RebalancingWorkerTable rebalancingWorkerTable,
List<Integer> workerNeededPerPartition,
String pod) {
Map<String, String> scopeTagsWithPod = new HashMap<>();
scopeTagsWithPod.put(POD_TAG, pod);
int usedWorkers = 0;
for (RebalancingWorkerWithSortedJobs worker : rebalancingWorkerTable.getAllWorkers()) {
if (worker.getNumberOfJobs() != 0) {
usedWorkers += 1;
}
}
scope.gauge(MetricNames.USED_WORKER_COUNT).update(usedWorkers);
int totalNumberOfWorkersStillNeeded = 0;
for (long partitionIdx : rebalancingWorkerTable.getAllPartitions()) {
usedWorkers = 0;
Map<String, String> partitionTags = new HashMap<>();
partitionTags.put(VIRTUAL_PARTITION_TAG, Long.toString(partitionIdx));
partitionTags.putAll(scopeTagsWithPod);
DoubleSummaryStatistics stats = new DoubleSummaryStatistics();
List<RebalancingWorkerWithSortedJobs> allWorkersWithinPartition =
rebalancingWorkerTable.getAllWorkersForPartition(partitionIdx);
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(partitionTags)
.gauge(MetricNames.ASSIGNED_WORKER_NUMBER_IN_PARTITION)
.update(allWorkersWithinPartition.size());
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(partitionTags)
.gauge(MetricNames.REQUESTED_WORKER_NUMBER_IN_PARTITION)
.update(workerNeededPerPartition.get((int) partitionIdx));
totalNumberOfWorkersStillNeeded +=
workerNeededPerPartition.get((int) partitionIdx) - allWorkersWithinPartition.size();
for (RebalancingWorkerWithSortedJobs worker : allWorkersWithinPartition) {
if (worker.getNumberOfJobs() > 0) {
usedWorkers += 1;
}
Map<String, String> workerTags = new HashMap<>();
workerTags.put(VIRTUAL_PARTITION_TAG, Long.toString(partitionIdx));
workerTags.put(MetricNames.WORKER_IDX, Long.toString(worker.getWorkerId()));
workerTags.put(POD_TAG, pod);
stats.accept(worker.getLoad());
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(workerTags)
.gauge(MetricNames.WORKER_EXPECTED_LOAD)
.update(worker.getLoad());
}
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(partitionTags)
.gauge(MetricNames.WORKER_LOAD_AVG)
.update(stats.getAverage());
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(partitionTags)
.gauge(MetricNames.WORKER_LOAD_MAX)
.update(stats.getMax());
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(partitionTags)
.gauge(MetricNames.WORKER_LOAD_MIN)
.update(stats.getMin());
double standardDeviation = 0.0;
for (RebalancingWorkerWithSortedJobs worker :
rebalancingWorkerTable.getAllWorkersForPartition(partitionIdx)) {
if (worker.getNumberOfJobs() == 0) {
continue;
}
standardDeviation += Math.pow(worker.getLoad() - stats.getAverage(), 2);
}
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(partitionTags)
.gauge(MetricNames.WORKER_LOAD_STD_DEVIATION)
.update(standardDeviation / usedWorkers);
}
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(scopeTagsWithPod)
.gauge(MetricNames.EXTRA_WORKERS_UNFULFILLED)
.update(totalNumberOfWorkersStillNeeded);
}