in amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java [511:574]
private List<Lease> chooseLeasesToSteal(Map<String, Integer> leaseCounts, int needed, int target) {
List<Lease> leasesToSteal = new ArrayList<>();
Entry<String, Integer> mostLoadedWorker = null;
// Find the most loaded worker
for (Entry<String, Integer> worker : leaseCounts.entrySet()) {
if (mostLoadedWorker == null || mostLoadedWorker.getValue() < worker.getValue()) {
mostLoadedWorker = worker;
}
}
int numLeasesToSteal = 0;
if ((mostLoadedWorker.getValue() >= target) && (needed > 0)) {
int leasesOverTarget = mostLoadedWorker.getValue() - target;
numLeasesToSteal = Math.min(needed, leasesOverTarget);
// steal 1 if we need > 1 and max loaded worker has target leases.
if ((needed > 1) && (numLeasesToSteal == 0)) {
numLeasesToSteal = 1;
}
numLeasesToSteal = Math.min(numLeasesToSteal, maxLeasesToStealAtOneTime);
}
if (numLeasesToSteal <= 0) {
if (log.isDebugEnabled()) {
log.debug(String.format("Worker %s not stealing from most loaded worker %s. He has %d,"
+ " target is %d, and I need %d",
workerIdentifier,
mostLoadedWorker.getKey(),
mostLoadedWorker.getValue(),
target,
needed));
}
return leasesToSteal;
} else {
if (log.isDebugEnabled()) {
log.debug("Worker {} will attempt to steal {} leases from most loaded worker {}. "
+ " He has {} leases, target is {}, I need {}, maxLeasesToStealAtOneTime is {}.",
workerIdentifier,
numLeasesToSteal,
mostLoadedWorker.getKey(),
mostLoadedWorker.getValue(),
target,
needed,
maxLeasesToStealAtOneTime);
}
}
String mostLoadedWorkerIdentifier = mostLoadedWorker.getKey();
List<Lease> candidates = new ArrayList<>();
// Collect leases belonging to that worker
for (Lease lease : allLeases.values()) {
if (mostLoadedWorkerIdentifier.equals(lease.leaseOwner())) {
candidates.add(lease);
}
}
// Return random ones
Collections.shuffle(candidates);
int toIndex = Math.min(candidates.size(), numLeasesToSteal);
leasesToSteal.addAll(candidates.subList(0, toIndex).stream()
.map(lease -> lease.isMarkedForLeaseSteal(true))
.collect(Collectors.toList()));
return leasesToSteal;
}