in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/AbstractRpcUriRebalancer.java [480:527]
private Map<Long, Long> ensureWorkerUnderloaded(
HashBasedTable<Long, Long, RebalancingJob> table) {
// calculate mapping from jobId to overloaded worker
Map<Long, Double> workerToLoad = new HashMap<>();
Map<Long, Long> result = new HashMap<>();
for (long jobId : table.rowKeySet()) {
Map<Long, RebalancingJob> row = table.row(jobId);
long workerId = row.keySet().iterator().next();
if (workerId == WorkerUtils.UNSET_WORKER_ID) {
continue;
}
RebalancingJob job = row.get(workerId);
Preconditions.checkNotNull(
job,
"rebalancingJob should not be null because Guava table guarantees >=1 column (worker_id) per row");
if (workerToLoad.containsKey(workerId)) {
double load = workerToLoad.get(workerId) + job.getLoad();
if (load < CAPACITY_PER_WORKER) {
workerToLoad.put(workerId, load);
} else {
result.put(jobId, workerId);
}
} else {
// pin first job to its worker no matter it's overloaded or not
// to stabilize assignment of over-sized job and prevent cascading re-assign
workerToLoad.put(workerId, job.getLoad());
}
}
// unpin job from overloaded worker
for (Map.Entry<Long, Long> jobWorker : result.entrySet()) {
long jobId = jobWorker.getKey();
long workerId = jobWorker.getValue();
RebalancingJob job = table.get(jobId, workerId);
table.remove(jobId, workerId);
table.put(jobWorker.getKey(), WorkerUtils.UNSET_WORKER_ID, job);
// Unpin job from worker, but if spare worker is not enough, we will recover the mapping by
// func ensureJobsAssigned
// Anyway, it will trigger status change of job group
// worker who picked up status change will compare existing job group and new job group to
// calculate new job to run or to cancel
// see AbstractKafkaFetcherThread::extractTopicPartitionMap
job.setWorkerId(WorkerUtils.UNSET_WORKER_ID);
}
return ImmutableMap.copyOf(result);
}