synchronized Map takeLeases()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java [156:244]


    synchronized Map<String, Lease> takeLeases(Callable<Long> timeProvider)
        throws DependencyException, InvalidStateException {
        // Key is leaseKey
        Map<String, Lease> takenLeases = new HashMap<>();

        final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);

        long startTime = System.currentTimeMillis();
        long updateAllLeasesTotalTimeMillis;
        boolean success = false;

        ProvisionedThroughputException lastException = null;

        try {
            try {
                for (int i = 1; i <= SCAN_RETRIES; i++) {
                    try {
                        updateAllLeases(timeProvider);
                        success = true;
                    } catch (ProvisionedThroughputException e) {
                        log.info("Worker {} could not find expired leases on try {} out of {}", workerIdentifier, i,
                                TAKE_RETRIES);
                        lastException = e;
                    }
                }
            } finally {
                updateAllLeasesTotalTimeMillis = System.currentTimeMillis() - startTime;
                MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
                MetricsUtil.addSuccessAndLatency(scope, "ListLeases", success, startTime, MetricsLevel.DETAILED);
            }


            if (lastException != null) {
                log.error("Worker {} could not scan leases table, aborting TAKE_LEASES_DIMENSION. Exception caught by"
                        + " last retry:", workerIdentifier, lastException);
                return takenLeases;
            }

            List<Lease> expiredLeases = getExpiredLeases();

            Set<Lease> leasesToTake = computeLeasesToTake(expiredLeases);
            leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake);

            Set<String> untakenLeaseKeys = new HashSet<>();

            for (Lease lease : leasesToTake) {
                String leaseKey = lease.leaseKey();

                startTime = System.currentTimeMillis();
                success = false;
                try {
                    for (int i = 1; i <= TAKE_RETRIES; i++) {
                        try {
                            if (leaseRefresher.takeLease(lease, workerIdentifier)) {
                                lease.lastCounterIncrementNanos(System.nanoTime());
                                takenLeases.put(leaseKey, lease);
                            } else {
                                untakenLeaseKeys.add(leaseKey);
                            }

                            success = true;
                            break;
                        } catch (ProvisionedThroughputException e) {
                            log.info("Could not take lease with key {} for worker {} on try {} out of {} due to"
                                            + " capacity", leaseKey, workerIdentifier, i, TAKE_RETRIES);
                        }
                    }
                } finally {
                    MetricsUtil.addSuccessAndLatency(scope, "TakeLease", success, startTime, MetricsLevel.DETAILED);
                }
            }

            if (takenLeases.size() > 0) {
                log.info("Worker {} successfully took {} leases: {}", workerIdentifier, takenLeases.size(),
                        stringJoin(takenLeases.keySet(), ", "));
            }

            if (untakenLeaseKeys.size() > 0) {
                log.info("Worker {} failed to take {} leases: {}", workerIdentifier, untakenLeaseKeys.size(),
                        stringJoin(untakenLeaseKeys, ", "));
            }

            scope.addData("TakenLeases", takenLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
        } finally {
            MetricsUtil.endScope(scope);
        }

        return takenLeases;
    }