private Set computeLeasesToTake()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java [380:496]


    private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
        Map<String, Integer> leaseCounts = computeLeaseCounts(expiredLeases);
        Set<Lease> leasesToTake = new HashSet<>();
        final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
        MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
        List<Lease> veryOldLeases = new ArrayList<>();

        final int numAvailableLeases = expiredLeases.size();
        int numLeases = 0;
        int numWorkers = 0;
        int numLeasesToReachTarget = 0;
        int leaseSpillover = 0;
        int veryOldLeaseCount = 0;

        try {
            numLeases = allLeases.size();
            numWorkers = leaseCounts.size();

            if (numLeases == 0) {
                // If there are no leases, I shouldn't try to take any.
                return leasesToTake;
            }


            int target;
            if (numWorkers >= numLeases) {
                // If we have n leases and n or more workers, each worker can have up to 1 lease, including myself.
                target = 1;
            } else {
                /*
                 * numWorkers must be < numLeases.
                 *
                 * Our target for each worker is numLeases / numWorkers (+1 if numWorkers doesn't evenly divide numLeases)
                 */
                target = numLeases / numWorkers + (numLeases % numWorkers == 0 ? 0 : 1);

                // Spill over is the number of leases this worker should have claimed, but did not because it would
                // exceed the max allowed for this worker.
                leaseSpillover = Math.max(0, target - maxLeasesForWorker);
                if (target > maxLeasesForWorker) {
                    log.warn(
                            "Worker {} target is {} leases and maxLeasesForWorker is {}. Resetting target to {},"
                                    + " lease spillover is {}. Note that some shards may not be processed if no other "
                                    + "workers are able to pick them up.",
                            workerIdentifier, target, maxLeasesForWorker, maxLeasesForWorker, leaseSpillover);
                    target = maxLeasesForWorker;
                }
            }

            int myCount = leaseCounts.get(workerIdentifier);
            numLeasesToReachTarget = target - myCount;

            int currentLeaseCount = leaseCounts.get(workerIdentifier);
            // If there are leases that have been expired for an extended period of
            // time, take them with priority, disregarding the target (computed
            // later) but obeying the maximum limit per worker.
            veryOldLeases = allLeases.values().stream()
                    .filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos()
                            > veryOldLeaseDurationNanosMultiplier * leaseDurationNanos)
                    .collect(Collectors.toList());

            if (!veryOldLeases.isEmpty()) {
                Collections.shuffle(veryOldLeases);
                veryOldLeaseCount = Math.max(0, Math.min(maxLeasesForWorker - currentLeaseCount, veryOldLeases.size()));
                HashSet<Lease> result = new HashSet<>(veryOldLeases.subList(0, veryOldLeaseCount));
                if (veryOldLeaseCount > 0) {
                    log.info("Taking leases that have been expired for a long time: {}", result);
                }
                return result;
            }

            if (numLeasesToReachTarget <= 0) {
                // If we don't need anything, return the empty set.
                return leasesToTake;
            }

            // Shuffle expiredLeases so workers don't all try to contend for the same leases.
            Collections.shuffle(expiredLeases);

            if (expiredLeases.size() > 0) {
                // If we have expired leases, get up to <needed> leases from expiredLeases
                for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) {
                    leasesToTake.add(expiredLeases.remove(0));
                }
            } else {
                // If there are no expired leases and we need a lease, consider stealing.
                List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target);
                for (Lease leaseToSteal : leasesToSteal) {
                    log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}",
                            workerIdentifier, numLeasesToReachTarget, leaseToSteal.leaseKey(),
                            leaseToSteal.leaseOwner());
                    leasesToTake.add(leaseToSteal);
                }
            }

            if (!leasesToTake.isEmpty()) {
                log.info(
                        "Worker {} saw {} total leases, {} available leases, {} "
                                + "workers. Target is {} leases, I have {} leases, I will take {} leases",
                        workerIdentifier, numLeases, numAvailableLeases, numWorkers, target, myCount,
                        leasesToTake.size());
            }

        } finally {
            scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
            scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
            scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
            scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED);
            scope.addData("NumWorkers", numWorkers, StandardUnit.COUNT, MetricsLevel.SUMMARY);
            scope.addData("TotalLeases", numLeases, StandardUnit.COUNT, MetricsLevel.DETAILED);
            scope.addData("VeryOldLeases", veryOldLeaseCount, StandardUnit.COUNT, MetricsLevel.SUMMARY);

            MetricsUtil.endScope(scope);
        }

        return leasesToTake;
    }