in amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java [156:244]
synchronized Map<String, Lease> takeLeases(Callable<Long> timeProvider)
throws DependencyException, InvalidStateException {
// Key is leaseKey
Map<String, Lease> takenLeases = new HashMap<>();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
long startTime = System.currentTimeMillis();
long updateAllLeasesTotalTimeMillis;
boolean success = false;
ProvisionedThroughputException lastException = null;
try {
try {
for (int i = 1; i <= SCAN_RETRIES; i++) {
try {
updateAllLeases(timeProvider);
success = true;
} catch (ProvisionedThroughputException e) {
log.info("Worker {} could not find expired leases on try {} out of {}", workerIdentifier, i,
TAKE_RETRIES);
lastException = e;
}
}
} finally {
updateAllLeasesTotalTimeMillis = System.currentTimeMillis() - startTime;
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
MetricsUtil.addSuccessAndLatency(scope, "ListLeases", success, startTime, MetricsLevel.DETAILED);
}
if (lastException != null) {
log.error("Worker {} could not scan leases table, aborting TAKE_LEASES_DIMENSION. Exception caught by"
+ " last retry:", workerIdentifier, lastException);
return takenLeases;
}
List<Lease> expiredLeases = getExpiredLeases();
Set<Lease> leasesToTake = computeLeasesToTake(expiredLeases);
leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake);
Set<String> untakenLeaseKeys = new HashSet<>();
for (Lease lease : leasesToTake) {
String leaseKey = lease.leaseKey();
startTime = System.currentTimeMillis();
success = false;
try {
for (int i = 1; i <= TAKE_RETRIES; i++) {
try {
if (leaseRefresher.takeLease(lease, workerIdentifier)) {
lease.lastCounterIncrementNanos(System.nanoTime());
takenLeases.put(leaseKey, lease);
} else {
untakenLeaseKeys.add(leaseKey);
}
success = true;
break;
} catch (ProvisionedThroughputException e) {
log.info("Could not take lease with key {} for worker {} on try {} out of {} due to"
+ " capacity", leaseKey, workerIdentifier, i, TAKE_RETRIES);
}
}
} finally {
MetricsUtil.addSuccessAndLatency(scope, "TakeLease", success, startTime, MetricsLevel.DETAILED);
}
}
if (takenLeases.size() > 0) {
log.info("Worker {} successfully took {} leases: {}", workerIdentifier, takenLeases.size(),
stringJoin(takenLeases.keySet(), ", "));
}
if (untakenLeaseKeys.size() > 0) {
log.info("Worker {} failed to take {} leases: {}", workerIdentifier, untakenLeaseKeys.size(),
stringJoin(untakenLeaseKeys, ", "));
}
scope.addData("TakenLeases", takenLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
} finally {
MetricsUtil.endScope(scope);
}
return takenLeases;
}