in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/leases/StreamsLeaseTaker.java [445:486]
private Map<String, T> takeLeases(List<T> leasesToTake) throws DependencyException, InvalidStateException {
// Key is leaseKey
Map<String, T> takenLeases = new HashMap<>();
Set<String> untakenLeaseKeys = new HashSet<>();
for (T lease : leasesToTake) {
String leaseKey = lease.getLeaseKey();
for (int i = 0; i < TAKE_RETRIES; i++) {
try {
if (leaseManager.takeLease(lease, workerIdentifier)) {
lease.setLastCounterIncrementNanos(System.nanoTime());
takenLeases.put(leaseKey, lease);
} else {
untakenLeaseKeys.add(leaseKey);
}
break;
} catch (ProvisionedThroughputException e) {
LOG.info(String.format("Could not take lease with key %s for worker %s on try %d out of %d due to capacity",
leaseKey,
workerIdentifier,
i,
TAKE_RETRIES));
}
}
}
if (takenLeases.size() > 0) {
LOG.info(String.format("Worker %s successfully took %d leases: %s",
workerIdentifier,
takenLeases.size(),
stringJoin(takenLeases.keySet(), ", ")));
}
if (untakenLeaseKeys.size() > 0) {
LOG.info(String.format("Worker %s failed to take %d leases: %s",
workerIdentifier,
untakenLeaseKeys.size(),
stringJoin(untakenLeaseKeys, ", ")));
}
return takenLeases;
}