in amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java [380:496]
private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
Map<String, Integer> leaseCounts = computeLeaseCounts(expiredLeases);
Set<Lease> leasesToTake = new HashSet<>();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
List<Lease> veryOldLeases = new ArrayList<>();
final int numAvailableLeases = expiredLeases.size();
int numLeases = 0;
int numWorkers = 0;
int numLeasesToReachTarget = 0;
int leaseSpillover = 0;
int veryOldLeaseCount = 0;
try {
numLeases = allLeases.size();
numWorkers = leaseCounts.size();
if (numLeases == 0) {
// If there are no leases, I shouldn't try to take any.
return leasesToTake;
}
int target;
if (numWorkers >= numLeases) {
// If we have n leases and n or more workers, each worker can have up to 1 lease, including myself.
target = 1;
} else {
/*
* numWorkers must be < numLeases.
*
* Our target for each worker is numLeases / numWorkers (+1 if numWorkers doesn't evenly divide numLeases)
*/
target = numLeases / numWorkers + (numLeases % numWorkers == 0 ? 0 : 1);
// Spill over is the number of leases this worker should have claimed, but did not because it would
// exceed the max allowed for this worker.
leaseSpillover = Math.max(0, target - maxLeasesForWorker);
if (target > maxLeasesForWorker) {
log.warn(
"Worker {} target is {} leases and maxLeasesForWorker is {}. Resetting target to {},"
+ " lease spillover is {}. Note that some shards may not be processed if no other "
+ "workers are able to pick them up.",
workerIdentifier, target, maxLeasesForWorker, maxLeasesForWorker, leaseSpillover);
target = maxLeasesForWorker;
}
}
int myCount = leaseCounts.get(workerIdentifier);
numLeasesToReachTarget = target - myCount;
int currentLeaseCount = leaseCounts.get(workerIdentifier);
// If there are leases that have been expired for an extended period of
// time, take them with priority, disregarding the target (computed
// later) but obeying the maximum limit per worker.
veryOldLeases = allLeases.values().stream()
.filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos()
> veryOldLeaseDurationNanosMultiplier * leaseDurationNanos)
.collect(Collectors.toList());
if (!veryOldLeases.isEmpty()) {
Collections.shuffle(veryOldLeases);
veryOldLeaseCount = Math.max(0, Math.min(maxLeasesForWorker - currentLeaseCount, veryOldLeases.size()));
HashSet<Lease> result = new HashSet<>(veryOldLeases.subList(0, veryOldLeaseCount));
if (veryOldLeaseCount > 0) {
log.info("Taking leases that have been expired for a long time: {}", result);
}
return result;
}
if (numLeasesToReachTarget <= 0) {
// If we don't need anything, return the empty set.
return leasesToTake;
}
// Shuffle expiredLeases so workers don't all try to contend for the same leases.
Collections.shuffle(expiredLeases);
if (expiredLeases.size() > 0) {
// If we have expired leases, get up to <needed> leases from expiredLeases
for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(expiredLeases.remove(0));
}
} else {
// If there are no expired leases and we need a lease, consider stealing.
List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target);
for (Lease leaseToSteal : leasesToSteal) {
log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}",
workerIdentifier, numLeasesToReachTarget, leaseToSteal.leaseKey(),
leaseToSteal.leaseOwner());
leasesToTake.add(leaseToSteal);
}
}
if (!leasesToTake.isEmpty()) {
log.info(
"Worker {} saw {} total leases, {} available leases, {} "
+ "workers. Target is {} leases, I have {} leases, I will take {} leases",
workerIdentifier, numLeases, numAvailableLeases, numWorkers, target, myCount,
leasesToTake.size());
}
} finally {
scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED);
scope.addData("NumWorkers", numWorkers, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("TotalLeases", numLeases, StandardUnit.COUNT, MetricsLevel.DETAILED);
scope.addData("VeryOldLeases", veryOldLeaseCount, StandardUnit.COUNT, MetricsLevel.SUMMARY);
MetricsUtil.endScope(scope);
}
return leasesToTake;
}