in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/leases/StreamsLeaseTaker.java [84:126]
synchronized public Map<String, T> takeLeases() throws DependencyException, InvalidStateException {
long startTime = System.currentTimeMillis();
refreshAllLeases(SYSTEM_CLOCK_CALLABLE);
final Set<T> expiredLeases = getExpiredLeases();
final Map<String, Integer> leaseCountsByHost = getUnfinishedLeaseCountsByHost(expiredLeases);
final int numWorkers = leaseCountsByHost.size();
final List<T> allUnfinishedLeases = getUnfinishedLeases(allLeases.values());
final List<T> allFinishedLeases = getFinishedLeases(allLeases.values());
final int myUnfinishedLeasesCount = getMyLeaseCount(allUnfinishedLeases, expiredLeases);
final int myFinishedLeasesCount = getMyLeaseCount(allFinishedLeases, expiredLeases);
final int targetUnfinishedLeasesCount = getTargetUnfinishedLeasesCount(allUnfinishedLeases.size(), numWorkers);
final int targetFinishedLeasesCount = getTargetFinishedLeasesCount(allFinishedLeases.size(), numWorkers);
final int remainingSlotsForUnfinishedLeases = targetUnfinishedLeasesCount - myUnfinishedLeasesCount;
final int remainingSlotsForFinishedLeases = targetFinishedLeasesCount - myFinishedLeasesCount;
final List<T> unfinishedExpiredLeases = getUnfinishedLeases(expiredLeases);
final List<T> leasesToTake = getLeasesToTakeFromExpiredLeases(unfinishedExpiredLeases, remainingSlotsForUnfinishedLeases);
leasesToTake.addAll(getLeasesToTakeFromExpiredLeases(getFinishedLeases(expiredLeases), remainingSlotsForFinishedLeases));
leasesToTake.addAll(getLeasesToSteal(leaseCountsByHost, remainingSlotsForUnfinishedLeases - unfinishedExpiredLeases.size(), targetUnfinishedLeasesCount, allUnfinishedLeases));
LOG.info(String.format("Worker %s saw %d total leases, %d expired leases, %d workers."
+ "Unfinished lease target: %d leases, I have %d unfinished leases. Finished leases target is %d and "
+ "I have %d finished leases. I will take %d leases in total.",
workerIdentifier,
allLeases.size(),
expiredLeases.size(),
numWorkers,
targetUnfinishedLeasesCount,
myUnfinishedLeasesCount,
targetFinishedLeasesCount,
myFinishedLeasesCount,
leasesToTake.size()));
// Take leases before eviction.
Map<String, T> takenLeases = takeLeases(leasesToTake);
// If I have more than my share of finished leases, evict
if (remainingSlotsForFinishedLeases < 0) {
evictLeases(getLeasesToEvict(Math.abs(remainingSlotsForFinishedLeases)));
}
LOG.info(String.format("TakeLeases took %d seconds.", (System.currentTimeMillis() - startTime) / 1000));
return takenLeases;
}