in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/leases/StreamsLeaseTaker.java [165:219]
private void updateAllLeases(Callable<Long> timeProvider)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
List<T> freshList = leaseManager.listLeases();
try {
lastScanTimeNanos = timeProvider.call();
} catch (Exception e) {
throw new DependencyException("Exception caught from timeProvider", e);
}
// This set will hold the lease keys not updated by the previous listLeases call.
Set<String> notUpdated = new HashSet<String>(allLeases.keySet());
// Iterate over all leases, finding ones to try to acquire that haven't changed since the last iteration
for (T lease : freshList) {
String leaseKey = lease.getLeaseKey();
T oldLease = allLeases.get(leaseKey);
allLeases.put(leaseKey, lease);
notUpdated.remove(leaseKey);
if (oldLease != null) {
// If we've seen this lease before...
if (oldLease.getLeaseCounter().equals(lease.getLeaseCounter())) {
// ...and the counter hasn't changed, propagate the lastRenewalNanos time from the old lease
lease.setLastCounterIncrementNanos(oldLease.getLastCounterIncrementNanos());
} else {
// ...and the counter has changed, set lastRenewalNanos to the time of the scan.
lease.setLastCounterIncrementNanos(lastScanTimeNanos);
}
} else {
if (lease.getLeaseOwner() == null) {
// if this new lease is unowned, it's never been renewed.
lease.setLastCounterIncrementNanos(0L);
if (LOG.isDebugEnabled()) {
LOG.debug("Treating new lease with key " + leaseKey
+ " as never renewed because it is new and unowned.");
}
} else {
// if this new lease is owned, treat it as renewed as of the scan
lease.setLastCounterIncrementNanos(lastScanTimeNanos);
if (LOG.isDebugEnabled()) {
LOG.debug("Treating new lease with key " + leaseKey
+ " as recently renewed because it is new and owned.");
}
}
}
}
// Remove dead leases from allLeases
for (String key : notUpdated) {
allLeases.remove(key);
}
}