private void cleanupGarbageLeases()

in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardSyncer.java [600:638]


    private void cleanupGarbageLeases(List<Shard> shards,
        List<KinesisClientLease> trackedLeases,
        IKinesisProxy kinesisProxy,
        ILeaseManager<KinesisClientLease> leaseManager)
        throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException {
        LOG.info("cleanupGarbageLeases: begin");
        Set<String> kinesisShards = new HashSet<>();
        for (Shard shard : shards) {
            kinesisShards.add(shard.getShardId());
        }

        // Check if there are leases for non-existent shards
        List<KinesisClientLease> garbageLeases = new ArrayList<>();
        for (KinesisClientLease lease : trackedLeases) {
            if (leaseCleanupValidator.isCandidateForCleanup(lease, kinesisShards)) {
                garbageLeases.add(lease);
            }
        }

        if (!garbageLeases.isEmpty()) {
            LOG.info("Found " + garbageLeases.size()
                + " candidate leases for cleanup. Refreshing list of"
                + " Kinesis shards to pick up recent/latest shards");
            List<Shard> currentShardList = getShardList(kinesisProxy);
            Set<String> currentKinesisShardIds = new HashSet<>();
            for (Shard shard : currentShardList) {
                currentKinesisShardIds.add(shard.getShardId());
            }

            for (KinesisClientLease lease : garbageLeases) {
                if (leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds)) {
                    LOG.info("Deleting lease for shard " + lease.getLeaseKey()
                        + " as it is not present in Kinesis stream.");
                    leaseManager.deleteLease(lease);
                }
            }
        }
        LOG.info("cleanupGarbageLeases: done");
    }