public LeaseCleanupResult cleanupLease()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java [156:223]


    public LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion,
            boolean timeToCheckForCompletedShard, boolean timeToCheckForGarbageShard) throws TimeoutException,
            InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException {
        final Lease lease = leasePendingDeletion.lease();
        final ShardInfo shardInfo = leasePendingDeletion.shardInfo();
        final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier();

        final AWSExceptionManager exceptionManager = createExceptionManager();

        boolean cleanedUpCompletedLease = false;
        boolean cleanedUpGarbageLease = false;
        boolean alreadyCheckedForGarbageCollection = false;
        boolean wereChildShardsPresent = false;
        boolean wasResourceNotFound = false;

        try {
            if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) {
                final Lease leaseFromDDB = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey());
                if(leaseFromDDB != null) {
                    Set<String> childShardKeys = leaseFromDDB.childShardIds();
                    if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
                        try {
                            childShardKeys = leasePendingDeletion.getChildShardsFromService();

                            if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
                                log.error(
                                        "No child shards returned from service for shard {} for {} while cleaning up lease.",
                                        shardInfo.shardId(), streamIdentifier.streamName());
                            } else {
                                wereChildShardsPresent = true;
                                updateLeaseWithChildShards(leasePendingDeletion, childShardKeys);
                            }
                        } catch (ExecutionException e) {
                            throw exceptionManager.apply(e.getCause());
                        } finally {
                            alreadyCheckedForGarbageCollection = true;
                        }
                    } else {
                        wereChildShardsPresent = true;
                    }
                    try {
                        cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys);
                    } catch (Exception e) {
                        // Suppressing the exception here, so that we can attempt for garbage cleanup.
                        log.warn("Unable to cleanup lease for shard {} in {}", shardInfo.shardId(), streamIdentifier.streamName(), e);
                    }
                } else {
                    log.info("Lease not present in lease table while cleaning the shard {} of {}", shardInfo.shardId(), streamIdentifier.streamName());
                    cleanedUpCompletedLease = true;
                }
            }

            if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
                try {
                    wereChildShardsPresent = !CollectionUtils
                            .isNullOrEmpty(leasePendingDeletion.getChildShardsFromService());
                } catch (ExecutionException e) {
                    throw exceptionManager.apply(e.getCause());
                }
            }
        } catch (ResourceNotFoundException e) {
            wasResourceNotFound = true;
            cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease, e);
        }

        return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease, wereChildShardsPresent,
                wasResourceNotFound);
    }