in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/AbstractRpcUriRebalancer.java [537:583]
private Map<Long, Long> ensureWorkerGetAssigned(
HashBasedTable<Long, Long, RebalancingJob> table, Set<Long> workerIds) {
HashMap<Long, Long> result = new HashMap<>();
// compute the reserved assignment that respects stable assignment and numWorkersPerUri.
Map<String, SortedSet<RebalancingWorker>> reservedAssignment =
computeWorkersPerUri(table, workerIds);
for (String uri : reservedAssignment.keySet()) {
SortedSet<RebalancingWorker> workersForUri =
reservedAssignment.getOrDefault(uri, new TreeSet<>());
// no workload on the worker
if (workersForUri.size() != 0 && workersForUri.first().getLoad() == 0) {
// reserve workersForUri to make list in decrease order
List<RebalancingWorker> reversedWorkersForUri = new ArrayList<>();
workersForUri.stream().forEachOrdered(worker -> reversedWorkersForUri.add(0, worker));
Long workerToUnload = WorkerUtils.UNSET_WORKER_ID;
for (RebalancingWorker worker : reversedWorkersForUri) {
if (worker.getLoad() == 0) {
continue;
}
Map<Long, RebalancingJob> jobs = table.column(worker.getWorkerId());
if (jobs.size() != 1) {
workerToUnload = worker.getWorkerId();
break;
}
}
// No workers match unload criteria, skip worker unload
if (workerToUnload == WorkerUtils.UNSET_WORKER_ID) {
logger.warn("fail to find worker to unload");
return result;
}
// unpin workload for selected worker
Map<Long, RebalancingJob> jobs = table.column(workerToUnload);
List<Long> jobIds = ImmutableList.copyOf(jobs.keySet());
for (Long jobId : jobIds) {
RebalancingJob job = table.get(jobId, workerToUnload);
table.remove(jobId, workerToUnload);
table.put(jobId, WorkerUtils.UNSET_WORKER_ID, job);
job.setWorkerId(WorkerUtils.UNSET_WORKER_ID);
result.put(jobId, workerToUnload);
}
}
}
return ImmutableMap.copyOf(result);
}