synchronized public Map takeLeases()

in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/leases/StreamsLeaseTaker.java [84:126]


    synchronized public Map<String, T> takeLeases() throws DependencyException, InvalidStateException {
        long startTime = System.currentTimeMillis();
        refreshAllLeases(SYSTEM_CLOCK_CALLABLE);

        final Set<T> expiredLeases = getExpiredLeases();
        final Map<String, Integer> leaseCountsByHost = getUnfinishedLeaseCountsByHost(expiredLeases);
        final int numWorkers = leaseCountsByHost.size();
        final List<T> allUnfinishedLeases = getUnfinishedLeases(allLeases.values());
        final List<T> allFinishedLeases = getFinishedLeases(allLeases.values());
        final int myUnfinishedLeasesCount = getMyLeaseCount(allUnfinishedLeases, expiredLeases);
        final int myFinishedLeasesCount = getMyLeaseCount(allFinishedLeases, expiredLeases);
        final int targetUnfinishedLeasesCount = getTargetUnfinishedLeasesCount(allUnfinishedLeases.size(), numWorkers);
        final int targetFinishedLeasesCount = getTargetFinishedLeasesCount(allFinishedLeases.size(), numWorkers);

        final int remainingSlotsForUnfinishedLeases = targetUnfinishedLeasesCount - myUnfinishedLeasesCount;
        final int remainingSlotsForFinishedLeases = targetFinishedLeasesCount - myFinishedLeasesCount;
        final List<T> unfinishedExpiredLeases = getUnfinishedLeases(expiredLeases);
        final List<T> leasesToTake = getLeasesToTakeFromExpiredLeases(unfinishedExpiredLeases, remainingSlotsForUnfinishedLeases);
        leasesToTake.addAll(getLeasesToTakeFromExpiredLeases(getFinishedLeases(expiredLeases), remainingSlotsForFinishedLeases));
        leasesToTake.addAll(getLeasesToSteal(leaseCountsByHost, remainingSlotsForUnfinishedLeases - unfinishedExpiredLeases.size(), targetUnfinishedLeasesCount, allUnfinishedLeases));

        LOG.info(String.format("Worker %s saw %d total leases, %d expired leases, %d workers."
                + "Unfinished lease target: %d leases, I have %d unfinished leases. Finished leases target is %d and "
                + "I have %d finished leases. I will take %d leases in total.",
            workerIdentifier,
            allLeases.size(),
            expiredLeases.size(),
            numWorkers,
            targetUnfinishedLeasesCount,
            myUnfinishedLeasesCount,
            targetFinishedLeasesCount,
            myFinishedLeasesCount,
            leasesToTake.size()));

        // Take leases before eviction.
        Map<String, T> takenLeases = takeLeases(leasesToTake);
        // If I have more than my share of finished leases, evict
        if (remainingSlotsForFinishedLeases < 0) {
            evictLeases(getLeasesToEvict(Math.abs(remainingSlotsForFinishedLeases)));
        }
        LOG.info(String.format("TakeLeases took %d seconds.", (System.currentTimeMillis() - startTime) / 1000));
        return takenLeases;
    }