private Map ensureWorkerGetAssigned()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/AbstractRpcUriRebalancer.java [537:583]


  private Map<Long, Long> ensureWorkerGetAssigned(
      HashBasedTable<Long, Long, RebalancingJob> table, Set<Long> workerIds) {
    HashMap<Long, Long> result = new HashMap<>();
    // compute the reserved assignment that respects stable assignment and numWorkersPerUri.
    Map<String, SortedSet<RebalancingWorker>> reservedAssignment =
        computeWorkersPerUri(table, workerIds);
    for (String uri : reservedAssignment.keySet()) {
      SortedSet<RebalancingWorker> workersForUri =
          reservedAssignment.getOrDefault(uri, new TreeSet<>());

      // no workload on the worker
      if (workersForUri.size() != 0 && workersForUri.first().getLoad() == 0) {
        // reserve workersForUri to make list in decrease order
        List<RebalancingWorker> reversedWorkersForUri = new ArrayList<>();
        workersForUri.stream().forEachOrdered(worker -> reversedWorkersForUri.add(0, worker));
        Long workerToUnload = WorkerUtils.UNSET_WORKER_ID;
        for (RebalancingWorker worker : reversedWorkersForUri) {
          if (worker.getLoad() == 0) {
            continue;
          }
          Map<Long, RebalancingJob> jobs = table.column(worker.getWorkerId());
          if (jobs.size() != 1) {
            workerToUnload = worker.getWorkerId();
            break;
          }
        }

        // No workers match unload criteria, skip worker unload
        if (workerToUnload == WorkerUtils.UNSET_WORKER_ID) {
          logger.warn("fail to find worker to unload");
          return result;
        }

        // unpin workload for selected worker
        Map<Long, RebalancingJob> jobs = table.column(workerToUnload);
        List<Long> jobIds = ImmutableList.copyOf(jobs.keySet());
        for (Long jobId : jobIds) {
          RebalancingJob job = table.get(jobId, workerToUnload);
          table.remove(jobId, workerToUnload);
          table.put(jobId, WorkerUtils.UNSET_WORKER_ID, job);
          job.setWorkerId(WorkerUtils.UNSET_WORKER_ID);
          result.put(jobId, workerToUnload);
        }
      }
    }
    return ImmutableMap.copyOf(result);
  }