in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/leases/StreamsLeaseTaker.java [376:432]
private List<T> getLeasesToSteal(final Map<String, Integer> leaseCountsForHosts, final int needed, final int target, final List<T> activeLeases) {
List<T> leasesToSteal = new ArrayList<>();
if (needed <= 0) {
return leasesToSteal;
}
// Mapping denoting number of leases added to leasesToSteal per worker.
final Map<String, Integer> leasesAddedSoFar = new HashMap<>();
// Mapping containing number of extra leases every worker other than me has over the target.
final Map<String, Integer> extraLeasesWithHosts = leaseCountsForHosts.entrySet().stream()
.filter(entry -> !entry.getKey().equals(workerIdentifier))
.filter(entry -> entry.getValue() > target)
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue() - target));
int numWorkersToStealFrom = extraLeasesWithHosts.size();
if (numWorkersToStealFrom <= 0) {
return leasesToSteal;
}
int leaseCountToStealPerWorker = needed / numWorkersToStealFrom + ((needed % numWorkersToStealFrom > 0) ? 1 : 0);
// Shuffle so that every worker get doesn't contest for same lease.
Collections.shuffle(activeLeases);
for (T lease : activeLeases) {
String leaseOwner = lease.getLeaseOwner();
if (leaseOwner != null) {
int extraLeasesWithCurrentOwner = extraLeasesWithHosts.getOrDefault(leaseOwner, 0);
int addedSoFarFromCurrentOwner = leasesAddedSoFar.getOrDefault(leaseOwner, 0);
// If current owner has extra leases that can be stolen and
// I can steal more from the current owner
if (extraLeasesWithCurrentOwner > 0 &&
addedSoFarFromCurrentOwner < leaseCountToStealPerWorker) {
extraLeasesWithHosts.put(leaseOwner, extraLeasesWithCurrentOwner - 1);
leasesAddedSoFar.put(leaseOwner, addedSoFarFromCurrentOwner + 1);
leasesToSteal.add(lease);
}
if(leasesToSteal.size() >= needed) {
break;
}
}
}
for (T leaseToSteal : leasesToSteal) {
LOG.info(String.format(
"Worker %s needs %d leases. It will steal lease %s from %s",
workerIdentifier,
needed,
leaseToSteal.getLeaseKey(),
leaseToSteal.getLeaseOwner()));
}
LOG.info(String.format("Worker %s will try to steal total %d leases", workerIdentifier, leasesToSteal.size()));
return leasesToSteal;
}