private List getLeasesToSteal()

in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/leases/StreamsLeaseTaker.java [376:432]


    private List<T> getLeasesToSteal(final Map<String, Integer> leaseCountsForHosts, final int needed, final int target, final List<T> activeLeases) {
        List<T> leasesToSteal = new ArrayList<>();

        if (needed <= 0) {
            return leasesToSteal;
        }

        // Mapping denoting number of leases added to leasesToSteal per worker.
        final Map<String, Integer> leasesAddedSoFar = new HashMap<>();

        // Mapping containing number of extra leases every worker other than me has over the target.
        final Map<String, Integer> extraLeasesWithHosts = leaseCountsForHosts.entrySet().stream()
            .filter(entry -> !entry.getKey().equals(workerIdentifier))
            .filter(entry -> entry.getValue() > target)
            .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue() - target));

        int numWorkersToStealFrom = extraLeasesWithHosts.size();
        if (numWorkersToStealFrom <= 0) {
            return leasesToSteal;
        }
        int leaseCountToStealPerWorker = needed / numWorkersToStealFrom + ((needed % numWorkersToStealFrom > 0) ? 1 : 0);
        // Shuffle so that every worker get doesn't contest for same lease.
        Collections.shuffle(activeLeases);

        for (T lease : activeLeases) {
            String leaseOwner = lease.getLeaseOwner();
            if (leaseOwner != null) {
                int extraLeasesWithCurrentOwner = extraLeasesWithHosts.getOrDefault(leaseOwner, 0);
                int addedSoFarFromCurrentOwner = leasesAddedSoFar.getOrDefault(leaseOwner, 0);
                // If current owner has extra leases that can be stolen and
                // I can steal more from the current owner

                if (extraLeasesWithCurrentOwner  > 0 &&
                    addedSoFarFromCurrentOwner < leaseCountToStealPerWorker) {
                    extraLeasesWithHosts.put(leaseOwner, extraLeasesWithCurrentOwner  - 1);
                    leasesAddedSoFar.put(leaseOwner, addedSoFarFromCurrentOwner + 1);
                    leasesToSteal.add(lease);
                }

                if(leasesToSteal.size() >= needed) {
                    break;
                }
            }
        }

        for (T leaseToSteal : leasesToSteal) {
            LOG.info(String.format(
                "Worker %s needs %d leases. It will steal lease %s from %s",
                workerIdentifier,
                needed,
                leaseToSteal.getLeaseKey(),
                leaseToSteal.getLeaseOwner()));
        }

        LOG.info(String.format("Worker %s will try to steal total %d leases", workerIdentifier, leasesToSteal.size()));
        return leasesToSteal;
    }