private List chooseLeasesToSteal()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java [511:574]


    private List<Lease> chooseLeasesToSteal(Map<String, Integer> leaseCounts, int needed, int target) {
        List<Lease> leasesToSteal = new ArrayList<>();

        Entry<String, Integer> mostLoadedWorker = null;
        // Find the most loaded worker
        for (Entry<String, Integer> worker : leaseCounts.entrySet()) {
            if (mostLoadedWorker == null || mostLoadedWorker.getValue() < worker.getValue()) {
                mostLoadedWorker = worker;
            }
        }

        int numLeasesToSteal = 0;
        if ((mostLoadedWorker.getValue() >= target) && (needed > 0)) {
            int leasesOverTarget = mostLoadedWorker.getValue() - target;
            numLeasesToSteal = Math.min(needed, leasesOverTarget);
            // steal 1 if we need > 1 and max loaded worker has target leases.
            if ((needed > 1) && (numLeasesToSteal == 0)) {
                numLeasesToSteal = 1;
            }
            numLeasesToSteal = Math.min(numLeasesToSteal, maxLeasesToStealAtOneTime);
        }

        if (numLeasesToSteal <= 0) {
            if (log.isDebugEnabled()) {
                log.debug(String.format("Worker %s not stealing from most loaded worker %s.  He has %d,"
                        + " target is %d, and I need %d",
                        workerIdentifier,
                        mostLoadedWorker.getKey(),
                        mostLoadedWorker.getValue(),
                        target,
                        needed));
            }
            return leasesToSteal;
        } else {
            if (log.isDebugEnabled()) {
                log.debug("Worker {} will attempt to steal {} leases from most loaded worker {}. "
                        + " He has {} leases, target is {}, I need {}, maxLeasesToStealAtOneTime is {}.",
                        workerIdentifier,
                        numLeasesToSteal,
                        mostLoadedWorker.getKey(),
                        mostLoadedWorker.getValue(),
                        target,
                        needed,
                        maxLeasesToStealAtOneTime);
            }
        }

        String mostLoadedWorkerIdentifier = mostLoadedWorker.getKey();
        List<Lease> candidates = new ArrayList<>();
        // Collect leases belonging to that worker
        for (Lease lease : allLeases.values()) {
            if (mostLoadedWorkerIdentifier.equals(lease.leaseOwner())) {
                candidates.add(lease);
            }
        }

        // Return random ones
        Collections.shuffle(candidates);
        int toIndex = Math.min(candidates.size(), numLeasesToSteal);
        leasesToSteal.addAll(candidates.subList(0, toIndex).stream()
                .map(lease -> lease.isMarkedForLeaseSteal(true))
                .collect(Collectors.toList()));
        return leasesToSteal;
    }