in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [284:383]
private void emitMetrics(List<Integer> workerNeededPerPartition) {
int usedWorkers = 0;
for (RebalancingWorkerWithSortedJobs worker : rebalancingCache.getAllWorkers()) {
if (worker.getNumberOfJobs() != 0) {
usedWorkers += 1;
}
}
scope.gauge(MetricNames.USED_WORKER_COUNT).update(usedWorkers);
int totalNumberOfWorkersStillNeeded = 0;
for (long partitionIdx : rebalancingCache.getAllPartitions()) {
usedWorkers = 0;
Map<String, String> partitionTags =
ImmutableMap.of(VIRTUAL_PARTITION_TAG, Long.toString(partitionIdx));
DoubleSummaryStatistics stats = new DoubleSummaryStatistics();
List<RebalancingWorkerWithSortedJobs> allWorkersWithinPartition =
rebalancingCache.getAllWorkersForPartition(partitionIdx);
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(partitionTags)
.gauge(MetricNames.ASSIGNED_WORKER_NUMBER_IN_PARTITION)
.update(allWorkersWithinPartition.size());
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(partitionTags)
.gauge(MetricNames.REQUESTED_WORKER_NUMBER_IN_PARTITION)
.update(workerNeededPerPartition.get((int) partitionIdx));
totalNumberOfWorkersStillNeeded +=
workerNeededPerPartition.get((int) partitionIdx) - allWorkersWithinPartition.size();
for (RebalancingWorkerWithSortedJobs worker : allWorkersWithinPartition) {
if (worker.getNumberOfJobs() > 0) {
usedWorkers += 1;
}
Map<String, String> workerTags =
ImmutableMap.of(
VIRTUAL_PARTITION_TAG,
Long.toString(partitionIdx),
MetricNames.WORKER_IDX,
Long.toString(worker.getWorkerId()));
stats.accept(worker.getLoad());
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(workerTags)
.gauge(MetricNames.WORKER_EXPECTED_LOAD)
.update(worker.getLoad());
}
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(partitionTags)
.gauge(MetricNames.WORKER_LOAD_AVG)
.update(stats.getAverage());
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(partitionTags)
.gauge(MetricNames.WORKER_LOAD_MAX)
.update(stats.getMax());
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(partitionTags)
.gauge(MetricNames.WORKER_LOAD_MIN)
.update(stats.getMin());
double standardDeviation = 0.0;
for (RebalancingWorkerWithSortedJobs worker :
rebalancingCache.getAllWorkersForPartition(partitionIdx)) {
if (worker.getNumberOfJobs() == 0) {
continue;
}
standardDeviation += Math.pow(worker.getLoad() - stats.getAverage(), 2);
}
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(partitionTags)
.gauge(MetricNames.WORKER_LOAD_STD_DEVIATION)
.update(standardDeviation / usedWorkers);
}
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.gauge(MetricNames.EXTRA_WORKERS_UNFULFILLED)
.update(totalNumberOfWorkersStillNeeded);
int totalRequestedNumberOfWorker =
workerNeededPerPartition.stream().mapToInt(Integer::intValue).sum();
totalRequestedNumberOfWorker =
roundUpToNearestNumber(totalRequestedNumberOfWorker, TARGET_UNIT_NUMBER);
if (shadowRun) {
// if it's shadow, still emit metric for comparison
scope.gauge(MetricNames.REQUESTED_WORKER_COUNT).update(totalRequestedNumberOfWorker);
} else {
scope.gauge(MetricNames.WORKERS_TARGET).update(totalRequestedNumberOfWorker);
}
}