in amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java [156:223]
public LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion,
boolean timeToCheckForCompletedShard, boolean timeToCheckForGarbageShard) throws TimeoutException,
InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException {
final Lease lease = leasePendingDeletion.lease();
final ShardInfo shardInfo = leasePendingDeletion.shardInfo();
final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier();
final AWSExceptionManager exceptionManager = createExceptionManager();
boolean cleanedUpCompletedLease = false;
boolean cleanedUpGarbageLease = false;
boolean alreadyCheckedForGarbageCollection = false;
boolean wereChildShardsPresent = false;
boolean wasResourceNotFound = false;
try {
if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) {
final Lease leaseFromDDB = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey());
if(leaseFromDDB != null) {
Set<String> childShardKeys = leaseFromDDB.childShardIds();
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
try {
childShardKeys = leasePendingDeletion.getChildShardsFromService();
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
log.error(
"No child shards returned from service for shard {} for {} while cleaning up lease.",
shardInfo.shardId(), streamIdentifier.streamName());
} else {
wereChildShardsPresent = true;
updateLeaseWithChildShards(leasePendingDeletion, childShardKeys);
}
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} finally {
alreadyCheckedForGarbageCollection = true;
}
} else {
wereChildShardsPresent = true;
}
try {
cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys);
} catch (Exception e) {
// Suppressing the exception here, so that we can attempt for garbage cleanup.
log.warn("Unable to cleanup lease for shard {} in {}", shardInfo.shardId(), streamIdentifier.streamName(), e);
}
} else {
log.info("Lease not present in lease table while cleaning the shard {} of {}", shardInfo.shardId(), streamIdentifier.streamName());
cleanedUpCompletedLease = true;
}
}
if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
try {
wereChildShardsPresent = !CollectionUtils
.isNullOrEmpty(leasePendingDeletion.getChildShardsFromService());
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
}
}
} catch (ResourceNotFoundException e) {
wasResourceNotFound = true;
cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease, e);
}
return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease, wereChildShardsPresent,
wasResourceNotFound);
}